Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

LINZ, Arnaud

Hello,

 

In flink 1.3, I was able to make a clean stop of a HA streaming application just by ending the source “run()” method (with an ending condition).

I try to update my code to flink 1.6.2, but that is no longer working.

 

Even if there are no sources and no item to process, the cluster continue its execution forever, with an infinite number of such messages:

Checkpoint triggering task Source: Custom Source (1/2) of job 3b286f5344c50f0e466bb8ee79a2bb69 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.

 

Why has this behavior changed? How am I supposed to stop a streaming execution from its own code now? Is https://issues.apache.org/jira/browse/FLINK-2111 of any use?  

 

Thanks,

Arnaud

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

RE: Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

LINZ, Arnaud

Hello,

 

This has nothing to do with HA. All my unit tests involving a streaming app now fail in “infinite execution”

This simple code never ends :

    @Test

    public void testFlink163() throws Exception {

        // get the execution environment

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data

        final DataStreamSource<String> text = env.addSource(new SourceFunction<String>() {

            @Override

            public void run(final SourceContext<String> ctx) throws Exception {

                for (int count = 0; count < 5; count++) {

                    ctx.collect(String.valueOf(count));

                }

            }

            @Override

            public void cancel() {

            }

        });

        text.print().setParallelism(1);

        env.execute("Simple Test");

        // Never ends !

    }

Is this really a new feature or a critical bug?

In the log, the task executor is stopped

[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)

But execute() does not return.

 

Arnaud

 

Log is :

[2018-11-07 11:11:11,432] INFO Running job on local embedded Flink mini cluster (org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114)

[2018-11-07 11:11:11,449] INFO Starting Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:227)

[2018-11-07 11:11:11,636] INFO Starting Metrics Registry (org.apache.flink.runtime.minicluster.MiniCluster:238)

[2018-11-07 11:11:11,652] INFO No metrics reporter configured, no metrics will be exposed/reported. (org.apache.flink.runtime.metrics.MetricRegistryImpl:113)

[2018-11-07 11:11:11,703] INFO Starting RPC Service(s) (org.apache.flink.runtime.minicluster.MiniCluster:249)

[2018-11-07 11:11:12,244] INFO Slf4jLogger started (akka.event.slf4j.Slf4jLogger:92)

[2018-11-07 11:11:12,264] INFO Starting high-availability services (org.apache.flink.runtime.minicluster.MiniCluster:290)

[2018-11-07 11:11:12,367] INFO Created BLOB server storage directory C:\Users\alinz\AppData\Local\Temp\blobStore-fd104a2d-caaf-4740-a762-d292cb2ed108 (org.apache.flink.runtime.blob.BlobServer:141)

[2018-11-07 11:11:12,379] INFO Started BLOB server at 0.0.0.0:64504 - max concurrent requests: 50 - max backlog: 1000 (org.apache.flink.runtime.blob.BlobServer:203)

[2018-11-07 11:11:12,380] INFO Starting ResourceManger (org.apache.flink.runtime.minicluster.MiniCluster:301)

[2018-11-07 11:11:12,409] INFO Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:12,432] INFO Proposing leadership to contender org.apache.flink.runtime.resourcemanager.StandaloneResourceManager@5b1f29fa @ akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:12,439] INFO ResourceManager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 was granted leadership with fencing token 86394924fb97bad612b67f526f84406f (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:953)

[2018-11-07 11:11:12,440] INFO Starting the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:185)

[2018-11-07 11:11:12,442] INFO Received confirmation of leadership for leader akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 , session=12b67f52-6f84-406f-8639-4924fb97bad6 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:12,452] INFO Created BLOB cache storage directory C:\Users\alinz\AppData\Local\Temp\blobStore-b2618f73-5ec6-4fdf-ad43-1da6d6c19a4f (org.apache.flink.runtime.blob.PermanentBlobCache:107)

[2018-11-07 11:11:12,454] INFO Created BLOB cache storage directory C:\Users\alinz\AppData\Local\Temp\blobStore-df6c61d2-3c51-4335-a96e-6b00c82e4d90 (org.apache.flink.runtime.blob.TransientBlobCache:107)

[2018-11-07 11:11:12,454] INFO Starting 1 TaskManger(s) (org.apache.flink.runtime.minicluster.MiniCluster:316)

[2018-11-07 11:11:12,460] INFO Starting TaskManager with ResourceID: e84ce076-ec5e-48d6-90dc-4b18ba7c5757 (org.apache.flink.runtime.taskexecutor.TaskManagerRunner:352)

[2018-11-07 11:11:12,531] INFO Temporary file directory 'C:\Users\alinz\AppData\Local\Temp': total 476 GB, usable 149 GB (31,30% usable) (org.apache.flink.runtime.taskexecutor.TaskManagerServices:720)

[2018-11-07 11:11:12,757] INFO Allocated 396 MB for network buffer pool (number of memory segments: 12686, bytes per segment: 32768). (org.apache.flink.runtime.io.network.buffer.NetworkBufferPool:114)

[2018-11-07 11:11:12,765] INFO Could not load Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder. (org.apache.flink.runtime.query.QueryableStateUtils:84)

[2018-11-07 11:11:12,765] INFO Could not load Queryable State Server. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder. (org.apache.flink.runtime.query.QueryableStateUtils:141)

[2018-11-07 11:11:12,766] INFO Starting the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:304)

[2018-11-07 11:11:12,768] WARN No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted. (org.apache.flink.runtime.taskmanager.TaskManagerLocation:102)

[2018-11-07 11:11:12,769] INFO Limiting managed memory to 0.7 of the currently free heap space (2493 MB), memory will be allocated lazily. (org.apache.flink.runtime.taskexecutor.TaskManagerServices:331)

[2018-11-07 11:11:12,776] INFO I/O manager uses directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 for spill files. (org.apache.flink.runtime.io.disk.iomanager.IOManager:95)

[2018-11-07 11:11:12,793] INFO Messages have a max timeout of 10000 ms (org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration:188)

[2018-11-07 11:11:12,803] INFO Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/taskmanager_0 . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:12,813] INFO Start job leader service. (org.apache.flink.runtime.taskexecutor.JobLeaderService:118)

[2018-11-07 11:11:12,814] INFO Connecting to ResourceManager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f). (org.apache.flink.runtime.taskexecutor.TaskExecutor:904)

[2018-11-07 11:11:12,814] INFO User file cache uses directory C:\Users\alinz\AppData\Local\Temp\flink-dist-cache-648ab4eb-f39c-4262-a3cc-07adfa6e5b43 (org.apache.flink.runtime.filecache.FileCache:107)

[2018-11-07 11:11:12,815] INFO Starting dispatcher rest endpoint. (org.apache.flink.runtime.minicluster.MiniCluster:327)

[2018-11-07 11:11:12,845] INFO Resolved ResourceManager address, beginning registration (org.apache.flink.runtime.taskexecutor.TaskExecutor:201)

[2018-11-07 11:11:12,846] INFO Registration at ResourceManager attempt 1 (timeout=100ms) (org.apache.flink.runtime.taskexecutor.TaskExecutor:250)

[2018-11-07 11:11:12,853] INFO Registering TaskManager with ResourceID e84ce076-ec5e-48d6-90dc-4b18ba7c5757 (akka://flink/user/taskmanager_0) at ResourceManager (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:727)

[2018-11-07 11:11:12,855] INFO Successful registration at resource manager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 under registration id 8098d3fe3fe83133051c3bb97bf96d37. (org.apache.flink.runtime.taskexecutor.TaskExecutor:94)

[2018-11-07 11:11:12,877] INFO Starting rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:128)

[2018-11-07 11:11:13,168] WARN Log file environment variable 'log.file' is not set. (org.apache.flink.runtime.webmonitor.WebMonitorUtils:95)

[2018-11-07 11:11:13,169] WARN JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (deprecated keys: [jobmanager.web.log.path])'. (org.apache.flink.runtime.webmonitor.WebMonitorUtils:101)

[2018-11-07 11:11:13,195] INFO Failed to load web based job submission extension. Probable reason: flink-runtime-web is not in the classpath. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:115)

[2018-11-07 11:11:13,514] INFO Rest endpoint listening at localhost:64523 (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:200)

[2018-11-07 11:11:13,514] INFO Proposing leadership to contender org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint@1f86099a @ http://localhost:64523 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:13,514] INFO Starting job dispatcher(s) for JobManger (org.apache.flink.runtime.minicluster.MiniCluster:364)

[2018-11-07 11:11:13,514] INFO http://localhost:64523 was granted leadership with leaderSessionID=940dc860-bbcb-4e35-8255-59702b220383 (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:757)

[2018-11-07 11:11:13,515] INFO Received confirmation of leadership for leader http://localhost:64523 , session=940dc860-bbcb-4e35-8255-59702b220383 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:13,523] INFO Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:13,537] INFO Proposing leadership to contender org.apache.flink.runtime.dispatcher.StandaloneDispatcher@27e32fe4 @ akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:13,538] INFO Flink Mini Cluster started successfully (org.apache.flink.runtime.minicluster.MiniCluster:410)

[2018-11-07 11:11:13,538] INFO Dispatcher akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a was granted leadership with fencing token d7ae33ab-20bb-4598-bc6d-b7a57ca66860 (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:818)

[2018-11-07 11:11:13,549] INFO Recovering all persisted jobs. (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:658)

[2018-11-07 11:11:13,550] INFO Received confirmation of leadership for leader akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a , session=d7ae33ab-20bb-4598-bc6d-b7a57ca66860 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:13,566] INFO Submitting job 0ef8697ca98f6d2b565ed928d17c8a49 (Simple Test). (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:247)

[2018-11-07 11:11:13,592] INFO Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_1 . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:13,618] INFO Initializing job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). (org.apache.flink.runtime.jobmaster.JobMaster:269)

[2018-11-07 11:11:13,623] INFO Using restart strategy NoRestartStrategy for Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). (org.apache.flink.runtime.jobmaster.JobMaster:280)

[2018-11-07 11:11:13,629] INFO Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/96fe7c78-28fe-484f-ae16-dcd1d4bc2c6b . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:13,654] INFO Job recovers via failover strategy: full graph restart (org.apache.flink.runtime.executiongraph.ExecutionGraph:425)

[2018-11-07 11:11:13,689] INFO Running initialization on master for job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). (org.apache.flink.runtime.jobmaster.JobMaster:195)

[2018-11-07 11:11:13,689] INFO Successfully ran initialization on master in 0 ms. (org.apache.flink.runtime.jobmaster.JobMaster:224)

[2018-11-07 11:11:13,722] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.runtime.jobmaster.JobMaster:230)

[2018-11-07 11:11:13,740] INFO Proposing leadership to contender org.apache.flink.runtime.jobmaster.JobManagerRunner@34c7d1b6 @ akka://flink/user/jobmanager_1 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:13,740] INFO JobManager runner for job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) was granted leadership with session id 56e8324b-0015-4464-b6c7-ba0accdcec2a at akka://flink/user/jobmanager_1. (org.apache.flink.runtime.jobmaster.JobManagerRunner:329)

[2018-11-07 11:11:13,743] INFO Starting execution of job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) (org.apache.flink.runtime.jobmaster.JobMaster:1009)

[2018-11-07 11:11:13,744] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state CREATED to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)

[2018-11-07 11:11:13,747] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to SCHEDULED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,753] INFO Connecting to ResourceManager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f) (org.apache.flink.runtime.jobmaster.JobMaster:1285)

[2018-11-07 11:11:13,754] INFO Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=56e8324b-0015-4464-b6c7-ba0accdcec2a (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:13,757] INFO Resolved ResourceManager address, beginning registration (org.apache.flink.runtime.jobmaster.JobMaster:201)

[2018-11-07 11:11:13,758] INFO Registration at ResourceManager attempt 1 (timeout=100ms) (org.apache.flink.runtime.jobmaster.JobMaster:250)

[2018-11-07 11:11:13,763] INFO Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}] (org.apache.flink.runtime.jobmaster.slotpool.SlotPool:733)

[2018-11-07 11:11:13,767] INFO Registering job manager b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:298)

[2018-11-07 11:11:13,771] INFO Registered job manager b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:672)

[2018-11-07 11:11:13,772] INFO JobManager successfully registered at ResourceManager, leader id: 86394924fb97bad612b67f526f84406f. (org.apache.flink.runtime.jobmaster.JobMaster:1307)

[2018-11-07 11:11:13,773] INFO Requesting new slot [SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager. (org.apache.flink.runtime.jobmaster.slotpool.SlotPool:689)

[2018-11-07 11:11:13,776] INFO Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 0ef8697ca98f6d2b565ed928d17c8a49 with allocation id AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:422)

[2018-11-07 11:11:13,778] INFO Receive slot request AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a} for job 0ef8697ca98f6d2b565ed928d17c8a49 from resource manager with leader id 86394924fb97bad612b67f526f84406f. (org.apache.flink.runtime.taskexecutor.TaskExecutor:743)

[2018-11-07 11:11:13,779] INFO Allocated slot for AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. (org.apache.flink.runtime.taskexecutor.TaskExecutor:755)

[2018-11-07 11:11:13,779] INFO Add job 0ef8697ca98f6d2b565ed928d17c8a49 for job leader monitoring. (org.apache.flink.runtime.taskexecutor.JobLeaderService:186)

[2018-11-07 11:11:13,781] INFO Try to register at job manager akka://flink/user/jobmanager_1 with leader id 56e8324b-0015-4464-b6c7-ba0accdcec2a. (org.apache.flink.runtime.taskexecutor.JobLeaderService:326)

[2018-11-07 11:11:13,782] INFO Resolved JobManager address, beginning registration (org.apache.flink.runtime.taskexecutor.JobLeaderService:201)

[2018-11-07 11:11:13,782] INFO Registration at JobManager attempt 1 (timeout=100ms) (org.apache.flink.runtime.taskexecutor.JobLeaderService:250)

[2018-11-07 11:11:13,785] INFO Successful registration at job manager akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.taskexecutor.JobLeaderService:374)

[2018-11-07 11:11:13,786] INFO Establish JobManager connection for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1137)

[2018-11-07 11:11:13,789] INFO Offer reserved slots to the leader of job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1042)

[2018-11-07 11:11:13,794] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from SCHEDULED to DEPLOYING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,794] INFO Deploying Source: Custom Source -> Sink: Print to Std. Out (1/1) (attempt #0) to 127.0.0.1 (org.apache.flink.runtime.executiongraph.ExecutionGraph:576)

[2018-11-07 11:11:13,819] INFO Received task Source: Custom Source -> Sink: Print to Std. Out (1/1). (org.apache.flink.runtime.taskexecutor.TaskExecutor:541)

[2018-11-07 11:11:13,820] INFO Activate slot AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. (org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable:237)

[2018-11-07 11:11:13,820] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to DEPLOYING. (org.apache.flink.runtime.taskmanager.Task:915)

[2018-11-07 11:11:13,820] INFO Creating FileSystem stream leak safety net for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING] (org.apache.flink.runtime.taskmanager.Task:579)

[2018-11-07 11:11:13,828] INFO Loading JAR files for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:586)

[2018-11-07 11:11:13,829] INFO Registering task at network: Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:612)

[2018-11-07 11:11:13,836] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.taskmanager.Task:915)

[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,840] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.streaming.runtime.tasks.StreamTask:230)

0

1

2

3

4

[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)

[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). (org.apache.flink.runtime.taskmanager.Task:818)

[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] (org.apache.flink.runtime.taskmanager.Task:845)

[2018-11-07 11:11:13,899] INFO Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)

[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,907] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)

[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)

[2018-11-07 11:11:13,908] INFO Shutting down (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)

[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:427)

[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)

[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)

[2018-11-07 11:11:23,583] INFO Shutting down TaskExecutorLocalStateStoresManager. (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)

[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)

[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)

[2018-11-07 11:11:23,591] INFO Removing cache directory C:\Users\alinz\AppData\Local\Temp\flink-web-ui (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)

[2018-11-07 11:11:23,593] INFO Closing the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)

[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)

[2018-11-07 11:11:23,596] INFO Close ResourceManager connection cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new address null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)

[2018-11-07 11:11:23,607] INFO Stop job leader service. (org.apache.flink.runtime.taskexecutor.JobLeaderService:135)

[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)

 

 

 

De : LINZ, Arnaud
Envoyé : mardi 6 novembre 2018 14:23
À : user <[hidden email]>
Objet : Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

 

Hello,

 

In flink 1.3, I was able to make a clean stop of a HA streaming application just by ending the source “run()” method (with an ending condition).

I try to update my code to flink 1.6.2, but that is no longer working.

 

Even if there are no sources and no item to process, the cluster continue its execution forever, with an infinite number of such messages:

Checkpoint triggering task Source: Custom Source (1/2) of job 3b286f5344c50f0e466bb8ee79a2bb69 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.

 

Why has this behavior changed? How am I supposed to stop a streaming execution from its own code now? Is https://issues.apache.org/jira/browse/FLINK-2111 of any use?  

 

Thanks,

Arnaud

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

RE: Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

LINZ, Arnaud

FYI, the code below ends with version 1.6.0, do not end in 1.6.1. I suspect it’s a bug instead of a new feature.

 

De : LINZ, Arnaud
Envoyé : mercredi 7 novembre 2018 11:14
À : 'user' <[hidden email]>
Objet : RE: Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

 

Hello,

 

This has nothing to do with HA. All my unit tests involving a streaming app now fail in “infinite execution”

This simple code never ends :

    @Test

    public void testFlink162() throws Exception {

        // get the execution environment

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data

        final DataStreamSource<String> text = env.addSource(new SourceFunction<String>() {

            @Override

            public void run(final SourceContext<String> ctx) throws Exception {

                for (int count = 0; count < 5; count++) {

                    ctx.collect(String.valueOf(count));

                }

            }

            @Override

            public void cancel() {

            }

        });

        text.print().setParallelism(1);

        env.execute("Simple Test");

        // Never ends !

    }

Is this really a new feature or a critical bug?

In the log, the task executor is stopped

[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)

But execute() does not return.

 

Arnaud

 

Log is :

[2018-11-07 11:11:11,432] INFO Running job on local embedded Flink mini cluster (org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114)

[2018-11-07 11:11:11,449] INFO Starting Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:227)

[2018-11-07 11:11:11,636] INFO Starting Metrics Registry (org.apache.flink.runtime.minicluster.MiniCluster:238)

[2018-11-07 11:11:11,652] INFO No metrics reporter configured, no metrics will be exposed/reported. (org.apache.flink.runtime.metrics.MetricRegistryImpl:113)

[2018-11-07 11:11:11,703] INFO Starting RPC Service(s) (org.apache.flink.runtime.minicluster.MiniCluster:249)

[2018-11-07 11:11:12,244] INFO Slf4jLogger started (akka.event.slf4j.Slf4jLogger:92)

[2018-11-07 11:11:12,264] INFO Starting high-availability services (org.apache.flink.runtime.minicluster.MiniCluster:290)

[2018-11-07 11:11:12,367] INFO Created BLOB server storage directory C:\Users\alinz\AppData\Local\Temp\blobStore-fd104a2d-caaf-4740-a762-d292cb2ed108 (org.apache.flink.runtime.blob.BlobServer:141)

[2018-11-07 11:11:12,379] INFO Started BLOB server at 0.0.0.0:64504 - max concurrent requests: 50 - max backlog: 1000 (org.apache.flink.runtime.blob.BlobServer:203)

[2018-11-07 11:11:12,380] INFO Starting ResourceManger (org.apache.flink.runtime.minicluster.MiniCluster:301)

[2018-11-07 11:11:12,409] INFO Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:12,432] INFO Proposing leadership to contender [hidden email] @ akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:12,439] INFO ResourceManager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 was granted leadership with fencing token 86394924fb97bad612b67f526f84406f (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:953)

[2018-11-07 11:11:12,440] INFO Starting the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:185)

[2018-11-07 11:11:12,442] INFO Received confirmation of leadership for leader akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 , session=12b67f52-6f84-406f-8639-4924fb97bad6 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:12,452] INFO Created BLOB cache storage directory C:\Users\alinz\AppData\Local\Temp\blobStore-b2618f73-5ec6-4fdf-ad43-1da6d6c19a4f (org.apache.flink.runtime.blob.PermanentBlobCache:107)

[2018-11-07 11:11:12,454] INFO Created BLOB cache storage directory C:\Users\alinz\AppData\Local\Temp\blobStore-df6c61d2-3c51-4335-a96e-6b00c82e4d90 (org.apache.flink.runtime.blob.TransientBlobCache:107)

[2018-11-07 11:11:12,454] INFO Starting 1 TaskManger(s) (org.apache.flink.runtime.minicluster.MiniCluster:316)

[2018-11-07 11:11:12,460] INFO Starting TaskManager with ResourceID: e84ce076-ec5e-48d6-90dc-4b18ba7c5757 (org.apache.flink.runtime.taskexecutor.TaskManagerRunner:352)

[2018-11-07 11:11:12,531] INFO Temporary file directory 'C:\Users\alinz\AppData\Local\Temp': total 476 GB, usable 149 GB (31,30% usable) (org.apache.flink.runtime.taskexecutor.TaskManagerServices:720)

[2018-11-07 11:11:12,757] INFO Allocated 396 MB for network buffer pool (number of memory segments: 12686, bytes per segment: 32768). (org.apache.flink.runtime.io.network.buffer.NetworkBufferPool:114)

[2018-11-07 11:11:12,765] INFO Could not load Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder. (org.apache.flink.runtime.query.QueryableStateUtils:84)

[2018-11-07 11:11:12,765] INFO Could not load Queryable State Server. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder. (org.apache.flink.runtime.query.QueryableStateUtils:141)

[2018-11-07 11:11:12,766] INFO Starting the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:304)

[2018-11-07 11:11:12,768] WARN No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted. (org.apache.flink.runtime.taskmanager.TaskManagerLocation:102)

[2018-11-07 11:11:12,769] INFO Limiting managed memory to 0.7 of the currently free heap space (2493 MB), memory will be allocated lazily. (org.apache.flink.runtime.taskexecutor.TaskManagerServices:331)

[2018-11-07 11:11:12,776] INFO I/O manager uses directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 for spill files. (org.apache.flink.runtime.io.disk.iomanager.IOManager:95)

[2018-11-07 11:11:12,793] INFO Messages have a max timeout of 10000 ms (org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration:188)

[2018-11-07 11:11:12,803] INFO Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/taskmanager_0 . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:12,813] INFO Start job leader service. (org.apache.flink.runtime.taskexecutor.JobLeaderService:118)

[2018-11-07 11:11:12,814] INFO Connecting to ResourceManager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f). (org.apache.flink.runtime.taskexecutor.TaskExecutor:904)

[2018-11-07 11:11:12,814] INFO User file cache uses directory C:\Users\alinz\AppData\Local\Temp\flink-dist-cache-648ab4eb-f39c-4262-a3cc-07adfa6e5b43 (org.apache.flink.runtime.filecache.FileCache:107)

[2018-11-07 11:11:12,815] INFO Starting dispatcher rest endpoint. (org.apache.flink.runtime.minicluster.MiniCluster:327)

[2018-11-07 11:11:12,845] INFO Resolved ResourceManager address, beginning registration (org.apache.flink.runtime.taskexecutor.TaskExecutor:201)

[2018-11-07 11:11:12,846] INFO Registration at ResourceManager attempt 1 (timeout=100ms) (org.apache.flink.runtime.taskexecutor.TaskExecutor:250)

[2018-11-07 11:11:12,853] INFO Registering TaskManager with ResourceID e84ce076-ec5e-48d6-90dc-4b18ba7c5757 (akka://flink/user/taskmanager_0) at ResourceManager (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:727)

[2018-11-07 11:11:12,855] INFO Successful registration at resource manager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 under registration id 8098d3fe3fe83133051c3bb97bf96d37. (org.apache.flink.runtime.taskexecutor.TaskExecutor:94)

[2018-11-07 11:11:12,877] INFO Starting rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:128)

[2018-11-07 11:11:13,168] WARN Log file environment variable 'log.file' is not set. (org.apache.flink.runtime.webmonitor.WebMonitorUtils:95)

[2018-11-07 11:11:13,169] WARN JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (deprecated keys: [jobmanager.web.log.path])'. (org.apache.flink.runtime.webmonitor.WebMonitorUtils:101)

[2018-11-07 11:11:13,195] INFO Failed to load web based job submission extension. Probable reason: flink-runtime-web is not in the classpath. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:115)

[2018-11-07 11:11:13,514] INFO Rest endpoint listening at localhost:64523 (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:200)

[2018-11-07 11:11:13,514] INFO Proposing leadership to contender [hidden email] @ http://localhost:64523 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:13,514] INFO Starting job dispatcher(s) for JobManger (org.apache.flink.runtime.minicluster.MiniCluster:364)

[2018-11-07 11:11:13,514] INFO http://localhost:64523 was granted leadership with leaderSessionID=940dc860-bbcb-4e35-8255-59702b220383 (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:757)

[2018-11-07 11:11:13,515] INFO Received confirmation of leadership for leader http://localhost:64523 , session=940dc860-bbcb-4e35-8255-59702b220383 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:13,523] INFO Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:13,537] INFO Proposing leadership to contender [hidden email] @ akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:13,538] INFO Flink Mini Cluster started successfully (org.apache.flink.runtime.minicluster.MiniCluster:410)

[2018-11-07 11:11:13,538] INFO Dispatcher akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a was granted leadership with fencing token d7ae33ab-20bb-4598-bc6d-b7a57ca66860 (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:818)

[2018-11-07 11:11:13,549] INFO Recovering all persisted jobs. (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:658)

[2018-11-07 11:11:13,550] INFO Received confirmation of leadership for leader akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a , session=d7ae33ab-20bb-4598-bc6d-b7a57ca66860 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:13,566] INFO Submitting job 0ef8697ca98f6d2b565ed928d17c8a49 (Simple Test). (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:247)

[2018-11-07 11:11:13,592] INFO Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_1 . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:13,618] INFO Initializing job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). (org.apache.flink.runtime.jobmaster.JobMaster:269)

[2018-11-07 11:11:13,623] INFO Using restart strategy NoRestartStrategy for Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). (org.apache.flink.runtime.jobmaster.JobMaster:280)

[2018-11-07 11:11:13,629] INFO Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/96fe7c78-28fe-484f-ae16-dcd1d4bc2c6b . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:13,654] INFO Job recovers via failover strategy: full graph restart (org.apache.flink.runtime.executiongraph.ExecutionGraph:425)

[2018-11-07 11:11:13,689] INFO Running initialization on master for job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). (org.apache.flink.runtime.jobmaster.JobMaster:195)

[2018-11-07 11:11:13,689] INFO Successfully ran initialization on master in 0 ms. (org.apache.flink.runtime.jobmaster.JobMaster:224)

[2018-11-07 11:11:13,722] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.runtime.jobmaster.JobMaster:230)

[2018-11-07 11:11:13,740] INFO Proposing leadership to contender [hidden email] @ akka://flink/user/jobmanager_1 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:13,740] INFO JobManager runner for job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) was granted leadership with session id 56e8324b-0015-4464-b6c7-ba0accdcec2a at akka://flink/user/jobmanager_1. (org.apache.flink.runtime.jobmaster.JobManagerRunner:329)

[2018-11-07 11:11:13,743] INFO Starting execution of job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) (org.apache.flink.runtime.jobmaster.JobMaster:1009)

[2018-11-07 11:11:13,744] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state CREATED to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)

[2018-11-07 11:11:13,747] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to SCHEDULED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,753] INFO Connecting to ResourceManager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f) (org.apache.flink.runtime.jobmaster.JobMaster:1285)

[2018-11-07 11:11:13,754] INFO Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=56e8324b-0015-4464-b6c7-ba0accdcec2a (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:13,757] INFO Resolved ResourceManager address, beginning registration (org.apache.flink.runtime.jobmaster.JobMaster:201)

[2018-11-07 11:11:13,758] INFO Registration at ResourceManager attempt 1 (timeout=100ms) (org.apache.flink.runtime.jobmaster.JobMaster:250)

[2018-11-07 11:11:13,763] INFO Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}] (org.apache.flink.runtime.jobmaster.slotpool.SlotPool:733)

[2018-11-07 11:11:13,767] INFO Registering job manager b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:298)

[2018-11-07 11:11:13,771] INFO Registered job manager b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:672)

[2018-11-07 11:11:13,772] INFO JobManager successfully registered at ResourceManager, leader id: 86394924fb97bad612b67f526f84406f. (org.apache.flink.runtime.jobmaster.JobMaster:1307)

[2018-11-07 11:11:13,773] INFO Requesting new slot [SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager. (org.apache.flink.runtime.jobmaster.slotpool.SlotPool:689)

[2018-11-07 11:11:13,776] INFO Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 0ef8697ca98f6d2b565ed928d17c8a49 with allocation id AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:422)

[2018-11-07 11:11:13,778] INFO Receive slot request AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a} for job 0ef8697ca98f6d2b565ed928d17c8a49 from resource manager with leader id 86394924fb97bad612b67f526f84406f. (org.apache.flink.runtime.taskexecutor.TaskExecutor:743)

[2018-11-07 11:11:13,779] INFO Allocated slot for AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. (org.apache.flink.runtime.taskexecutor.TaskExecutor:755)

[2018-11-07 11:11:13,779] INFO Add job 0ef8697ca98f6d2b565ed928d17c8a49 for job leader monitoring. (org.apache.flink.runtime.taskexecutor.JobLeaderService:186)

[2018-11-07 11:11:13,781] INFO Try to register at job manager akka://flink/user/jobmanager_1 with leader id 56e8324b-0015-4464-b6c7-ba0accdcec2a. (org.apache.flink.runtime.taskexecutor.JobLeaderService:326)

[2018-11-07 11:11:13,782] INFO Resolved JobManager address, beginning registration (org.apache.flink.runtime.taskexecutor.JobLeaderService:201)

[2018-11-07 11:11:13,782] INFO Registration at JobManager attempt 1 (timeout=100ms) (org.apache.flink.runtime.taskexecutor.JobLeaderService:250)

[2018-11-07 11:11:13,785] INFO Successful registration at job manager akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.taskexecutor.JobLeaderService:374)

[2018-11-07 11:11:13,786] INFO Establish JobManager connection for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1137)

[2018-11-07 11:11:13,789] INFO Offer reserved slots to the leader of job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1042)

[2018-11-07 11:11:13,794] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from SCHEDULED to DEPLOYING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,794] INFO Deploying Source: Custom Source -> Sink: Print to Std. Out (1/1) (attempt #0) to 127.0.0.1 (org.apache.flink.runtime.executiongraph.ExecutionGraph:576)

[2018-11-07 11:11:13,819] INFO Received task Source: Custom Source -> Sink: Print to Std. Out (1/1). (org.apache.flink.runtime.taskexecutor.TaskExecutor:541)

[2018-11-07 11:11:13,820] INFO Activate slot AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. (org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable:237)

[2018-11-07 11:11:13,820] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to DEPLOYING. (org.apache.flink.runtime.taskmanager.Task:915)

[2018-11-07 11:11:13,820] INFO Creating FileSystem stream leak safety net for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING] (org.apache.flink.runtime.taskmanager.Task:579)

[2018-11-07 11:11:13,828] INFO Loading JAR files for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:586)

[2018-11-07 11:11:13,829] INFO Registering task at network: Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:612)

[2018-11-07 11:11:13,836] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.taskmanager.Task:915)

[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,840] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.streaming.runtime.tasks.StreamTask:230)

0

1

2

3

4

[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)

[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). (org.apache.flink.runtime.taskmanager.Task:818)

[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] (org.apache.flink.runtime.taskmanager.Task:845)

[2018-11-07 11:11:13,899] INFO Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)

[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,907] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)

[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)

[2018-11-07 11:11:13,908] INFO Shutting down (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)

[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:427)

[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)

[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)

[2018-11-07 11:11:23,583] INFO Shutting down TaskExecutorLocalStateStoresManager. (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)

[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)

[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)

[2018-11-07 11:11:23,591] INFO Removing cache directory C:\Users\alinz\AppData\Local\Temp\flink-web-ui (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)

[2018-11-07 11:11:23,593] INFO Closing the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)

[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)

[2018-11-07 11:11:23,596] INFO Close ResourceManager connection cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new address null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)

[2018-11-07 11:11:23,607] INFO Stop job leader service. (org.apache.flink.runtime.taskexecutor.JobLeaderService:135)

[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)

 

 

 

De : LINZ, Arnaud
Envoyé : mardi 6 novembre 2018 14:23
À : user <[hidden email]>
Objet : Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

 

Hello,

 

In flink 1.3, I was able to make a clean stop of a HA streaming application just by ending the source “run()” method (with an ending condition).

I try to update my code to flink 1.6.2, but that is no longer working.

 

Even if there are no sources and no item to process, the cluster continue its execution forever, with an infinite number of such messages:

Checkpoint triggering task Source: Custom Source (1/2) of job 3b286f5344c50f0e466bb8ee79a2bb69 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.

 

Why has this behavior changed? How am I supposed to stop a streaming execution from its own code now? Is https://issues.apache.org/jira/browse/FLINK-2111 of any use?  

 

Thanks,

Arnaud

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

RE: Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

LINZ, Arnaud

1.    FLINK-10832

Created (with heavy difficulties as typing java code in a jira description was an awful experience J)

 

 

De : LINZ, Arnaud
Envoyé : mercredi 7 novembre 2018 11:43
À : 'user' <[hidden email]>
Objet : RE: Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

 

FYI, the code below ends with version 1.6.0, do not end in 1.6.1. I suspect it’s a bug instead of a new feature.

 

De : LINZ, Arnaud
Envoyé : mercredi 7 novembre 2018 11:14
À : 'user' <[hidden email]>
Objet : RE: Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

 

Hello,

 

This has nothing to do with HA. All my unit tests involving a streaming app now fail in “infinite execution”

This simple code never ends :

    @Test

    public void testFlink162() throws Exception {

        // get the execution environment

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data

        final DataStreamSource<String> text = env.addSource(new SourceFunction<String>() {

            @Override

            public void run(final SourceContext<String> ctx) throws Exception {

                for (int count = 0; count < 5; count++) {

                    ctx.collect(String.valueOf(count));

                }

            }

            @Override

            public void cancel() {

            }

        });

        text.print().setParallelism(1);

        env.execute("Simple Test");

        // Never ends !

    }

Is this really a new feature or a critical bug?

In the log, the task executor is stopped

[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)

But execute() does not return.

 

Arnaud

 

Log is :

[2018-11-07 11:11:11,432] INFO Running job on local embedded Flink mini cluster (org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114)

[2018-11-07 11:11:11,449] INFO Starting Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:227)

[2018-11-07 11:11:11,636] INFO Starting Metrics Registry (org.apache.flink.runtime.minicluster.MiniCluster:238)

[2018-11-07 11:11:11,652] INFO No metrics reporter configured, no metrics will be exposed/reported. (org.apache.flink.runtime.metrics.MetricRegistryImpl:113)

[2018-11-07 11:11:11,703] INFO Starting RPC Service(s) (org.apache.flink.runtime.minicluster.MiniCluster:249)

[2018-11-07 11:11:12,244] INFO Slf4jLogger started (akka.event.slf4j.Slf4jLogger:92)

[2018-11-07 11:11:12,264] INFO Starting high-availability services (org.apache.flink.runtime.minicluster.MiniCluster:290)

[2018-11-07 11:11:12,367] INFO Created BLOB server storage directory C:\Users\alinz\AppData\Local\Temp\blobStore-fd104a2d-caaf-4740-a762-d292cb2ed108 (org.apache.flink.runtime.blob.BlobServer:141)

[2018-11-07 11:11:12,379] INFO Started BLOB server at 0.0.0.0:64504 - max concurrent requests: 50 - max backlog: 1000 (org.apache.flink.runtime.blob.BlobServer:203)

[2018-11-07 11:11:12,380] INFO Starting ResourceManger (org.apache.flink.runtime.minicluster.MiniCluster:301)

[2018-11-07 11:11:12,409] INFO Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:12,432] INFO Proposing leadership to contender [hidden email] @ akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:12,439] INFO ResourceManager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 was granted leadership with fencing token 86394924fb97bad612b67f526f84406f (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:953)

[2018-11-07 11:11:12,440] INFO Starting the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:185)

[2018-11-07 11:11:12,442] INFO Received confirmation of leadership for leader akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 , session=12b67f52-6f84-406f-8639-4924fb97bad6 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:12,452] INFO Created BLOB cache storage directory C:\Users\alinz\AppData\Local\Temp\blobStore-b2618f73-5ec6-4fdf-ad43-1da6d6c19a4f (org.apache.flink.runtime.blob.PermanentBlobCache:107)

[2018-11-07 11:11:12,454] INFO Created BLOB cache storage directory C:\Users\alinz\AppData\Local\Temp\blobStore-df6c61d2-3c51-4335-a96e-6b00c82e4d90 (org.apache.flink.runtime.blob.TransientBlobCache:107)

[2018-11-07 11:11:12,454] INFO Starting 1 TaskManger(s) (org.apache.flink.runtime.minicluster.MiniCluster:316)

[2018-11-07 11:11:12,460] INFO Starting TaskManager with ResourceID: e84ce076-ec5e-48d6-90dc-4b18ba7c5757 (org.apache.flink.runtime.taskexecutor.TaskManagerRunner:352)

[2018-11-07 11:11:12,531] INFO Temporary file directory 'C:\Users\alinz\AppData\Local\Temp': total 476 GB, usable 149 GB (31,30% usable) (org.apache.flink.runtime.taskexecutor.TaskManagerServices:720)

[2018-11-07 11:11:12,757] INFO Allocated 396 MB for network buffer pool (number of memory segments: 12686, bytes per segment: 32768). (org.apache.flink.runtime.io.network.buffer.NetworkBufferPool:114)

[2018-11-07 11:11:12,765] INFO Could not load Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder. (org.apache.flink.runtime.query.QueryableStateUtils:84)

[2018-11-07 11:11:12,765] INFO Could not load Queryable State Server. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder. (org.apache.flink.runtime.query.QueryableStateUtils:141)

[2018-11-07 11:11:12,766] INFO Starting the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:304)

[2018-11-07 11:11:12,768] WARN No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted. (org.apache.flink.runtime.taskmanager.TaskManagerLocation:102)

[2018-11-07 11:11:12,769] INFO Limiting managed memory to 0.7 of the currently free heap space (2493 MB), memory will be allocated lazily. (org.apache.flink.runtime.taskexecutor.TaskManagerServices:331)

[2018-11-07 11:11:12,776] INFO I/O manager uses directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 for spill files. (org.apache.flink.runtime.io.disk.iomanager.IOManager:95)

[2018-11-07 11:11:12,793] INFO Messages have a max timeout of 10000 ms (org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration:188)

[2018-11-07 11:11:12,803] INFO Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/taskmanager_0 . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:12,813] INFO Start job leader service. (org.apache.flink.runtime.taskexecutor.JobLeaderService:118)

[2018-11-07 11:11:12,814] INFO Connecting to ResourceManager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f). (org.apache.flink.runtime.taskexecutor.TaskExecutor:904)

[2018-11-07 11:11:12,814] INFO User file cache uses directory C:\Users\alinz\AppData\Local\Temp\flink-dist-cache-648ab4eb-f39c-4262-a3cc-07adfa6e5b43 (org.apache.flink.runtime.filecache.FileCache:107)

[2018-11-07 11:11:12,815] INFO Starting dispatcher rest endpoint. (org.apache.flink.runtime.minicluster.MiniCluster:327)

[2018-11-07 11:11:12,845] INFO Resolved ResourceManager address, beginning registration (org.apache.flink.runtime.taskexecutor.TaskExecutor:201)

[2018-11-07 11:11:12,846] INFO Registration at ResourceManager attempt 1 (timeout=100ms) (org.apache.flink.runtime.taskexecutor.TaskExecutor:250)

[2018-11-07 11:11:12,853] INFO Registering TaskManager with ResourceID e84ce076-ec5e-48d6-90dc-4b18ba7c5757 (akka://flink/user/taskmanager_0) at ResourceManager (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:727)

[2018-11-07 11:11:12,855] INFO Successful registration at resource manager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 under registration id 8098d3fe3fe83133051c3bb97bf96d37. (org.apache.flink.runtime.taskexecutor.TaskExecutor:94)

[2018-11-07 11:11:12,877] INFO Starting rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:128)

[2018-11-07 11:11:13,168] WARN Log file environment variable 'log.file' is not set. (org.apache.flink.runtime.webmonitor.WebMonitorUtils:95)

[2018-11-07 11:11:13,169] WARN JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (deprecated keys: [jobmanager.web.log.path])'. (org.apache.flink.runtime.webmonitor.WebMonitorUtils:101)

[2018-11-07 11:11:13,195] INFO Failed to load web based job submission extension. Probable reason: flink-runtime-web is not in the classpath. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:115)

[2018-11-07 11:11:13,514] INFO Rest endpoint listening at localhost:64523 (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:200)

[2018-11-07 11:11:13,514] INFO Proposing leadership to contender [hidden email] @ http://localhost:64523 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:13,514] INFO Starting job dispatcher(s) for JobManger (org.apache.flink.runtime.minicluster.MiniCluster:364)

[2018-11-07 11:11:13,514] INFO http://localhost:64523 was granted leadership with leaderSessionID=940dc860-bbcb-4e35-8255-59702b220383 (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:757)

[2018-11-07 11:11:13,515] INFO Received confirmation of leadership for leader http://localhost:64523 , session=940dc860-bbcb-4e35-8255-59702b220383 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:13,523] INFO Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:13,537] INFO Proposing leadership to contender [hidden email] @ akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:13,538] INFO Flink Mini Cluster started successfully (org.apache.flink.runtime.minicluster.MiniCluster:410)

[2018-11-07 11:11:13,538] INFO Dispatcher akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a was granted leadership with fencing token d7ae33ab-20bb-4598-bc6d-b7a57ca66860 (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:818)

[2018-11-07 11:11:13,549] INFO Recovering all persisted jobs. (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:658)

[2018-11-07 11:11:13,550] INFO Received confirmation of leadership for leader akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a , session=d7ae33ab-20bb-4598-bc6d-b7a57ca66860 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:13,566] INFO Submitting job 0ef8697ca98f6d2b565ed928d17c8a49 (Simple Test). (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:247)

[2018-11-07 11:11:13,592] INFO Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_1 . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:13,618] INFO Initializing job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). (org.apache.flink.runtime.jobmaster.JobMaster:269)

[2018-11-07 11:11:13,623] INFO Using restart strategy NoRestartStrategy for Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). (org.apache.flink.runtime.jobmaster.JobMaster:280)

[2018-11-07 11:11:13,629] INFO Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/96fe7c78-28fe-484f-ae16-dcd1d4bc2c6b . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:13,654] INFO Job recovers via failover strategy: full graph restart (org.apache.flink.runtime.executiongraph.ExecutionGraph:425)

[2018-11-07 11:11:13,689] INFO Running initialization on master for job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). (org.apache.flink.runtime.jobmaster.JobMaster:195)

[2018-11-07 11:11:13,689] INFO Successfully ran initialization on master in 0 ms. (org.apache.flink.runtime.jobmaster.JobMaster:224)

[2018-11-07 11:11:13,722] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.runtime.jobmaster.JobMaster:230)

[2018-11-07 11:11:13,740] INFO Proposing leadership to contender [hidden email] @ akka://flink/user/jobmanager_1 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:13,740] INFO JobManager runner for job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) was granted leadership with session id 56e8324b-0015-4464-b6c7-ba0accdcec2a at akka://flink/user/jobmanager_1. (org.apache.flink.runtime.jobmaster.JobManagerRunner:329)

[2018-11-07 11:11:13,743] INFO Starting execution of job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) (org.apache.flink.runtime.jobmaster.JobMaster:1009)

[2018-11-07 11:11:13,744] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state CREATED to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)

[2018-11-07 11:11:13,747] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to SCHEDULED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,753] INFO Connecting to ResourceManager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f) (org.apache.flink.runtime.jobmaster.JobMaster:1285)

[2018-11-07 11:11:13,754] INFO Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=56e8324b-0015-4464-b6c7-ba0accdcec2a (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:13,757] INFO Resolved ResourceManager address, beginning registration (org.apache.flink.runtime.jobmaster.JobMaster:201)

[2018-11-07 11:11:13,758] INFO Registration at ResourceManager attempt 1 (timeout=100ms) (org.apache.flink.runtime.jobmaster.JobMaster:250)

[2018-11-07 11:11:13,763] INFO Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}] (org.apache.flink.runtime.jobmaster.slotpool.SlotPool:733)

[2018-11-07 11:11:13,767] INFO Registering job manager b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:298)

[2018-11-07 11:11:13,771] INFO Registered job manager b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:672)

[2018-11-07 11:11:13,772] INFO JobManager successfully registered at ResourceManager, leader id: 86394924fb97bad612b67f526f84406f. (org.apache.flink.runtime.jobmaster.JobMaster:1307)

[2018-11-07 11:11:13,773] INFO Requesting new slot [SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager. (org.apache.flink.runtime.jobmaster.slotpool.SlotPool:689)

[2018-11-07 11:11:13,776] INFO Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 0ef8697ca98f6d2b565ed928d17c8a49 with allocation id AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:422)

[2018-11-07 11:11:13,778] INFO Receive slot request AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a} for job 0ef8697ca98f6d2b565ed928d17c8a49 from resource manager with leader id 86394924fb97bad612b67f526f84406f. (org.apache.flink.runtime.taskexecutor.TaskExecutor:743)

[2018-11-07 11:11:13,779] INFO Allocated slot for AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. (org.apache.flink.runtime.taskexecutor.TaskExecutor:755)

[2018-11-07 11:11:13,779] INFO Add job 0ef8697ca98f6d2b565ed928d17c8a49 for job leader monitoring. (org.apache.flink.runtime.taskexecutor.JobLeaderService:186)

[2018-11-07 11:11:13,781] INFO Try to register at job manager akka://flink/user/jobmanager_1 with leader id 56e8324b-0015-4464-b6c7-ba0accdcec2a. (org.apache.flink.runtime.taskexecutor.JobLeaderService:326)

[2018-11-07 11:11:13,782] INFO Resolved JobManager address, beginning registration (org.apache.flink.runtime.taskexecutor.JobLeaderService:201)

[2018-11-07 11:11:13,782] INFO Registration at JobManager attempt 1 (timeout=100ms) (org.apache.flink.runtime.taskexecutor.JobLeaderService:250)

[2018-11-07 11:11:13,785] INFO Successful registration at job manager akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.taskexecutor.JobLeaderService:374)

[2018-11-07 11:11:13,786] INFO Establish JobManager connection for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1137)

[2018-11-07 11:11:13,789] INFO Offer reserved slots to the leader of job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1042)

[2018-11-07 11:11:13,794] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from SCHEDULED to DEPLOYING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,794] INFO Deploying Source: Custom Source -> Sink: Print to Std. Out (1/1) (attempt #0) to 127.0.0.1 (org.apache.flink.runtime.executiongraph.ExecutionGraph:576)

[2018-11-07 11:11:13,819] INFO Received task Source: Custom Source -> Sink: Print to Std. Out (1/1). (org.apache.flink.runtime.taskexecutor.TaskExecutor:541)

[2018-11-07 11:11:13,820] INFO Activate slot AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. (org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable:237)

[2018-11-07 11:11:13,820] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to DEPLOYING. (org.apache.flink.runtime.taskmanager.Task:915)

[2018-11-07 11:11:13,820] INFO Creating FileSystem stream leak safety net for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING] (org.apache.flink.runtime.taskmanager.Task:579)

[2018-11-07 11:11:13,828] INFO Loading JAR files for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:586)

[2018-11-07 11:11:13,829] INFO Registering task at network: Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:612)

[2018-11-07 11:11:13,836] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.taskmanager.Task:915)

[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,840] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.streaming.runtime.tasks.StreamTask:230)

0

1

2

3

4

[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)

[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). (org.apache.flink.runtime.taskmanager.Task:818)

[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] (org.apache.flink.runtime.taskmanager.Task:845)

[2018-11-07 11:11:13,899] INFO Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)

[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,907] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)

[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)

[2018-11-07 11:11:13,908] INFO Shutting down (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)

[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:427)

[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)

[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)

[2018-11-07 11:11:23,583] INFO Shutting down TaskExecutorLocalStateStoresManager. (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)

[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)

[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)

[2018-11-07 11:11:23,591] INFO Removing cache directory C:\Users\alinz\AppData\Local\Temp\flink-web-ui (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)

[2018-11-07 11:11:23,593] INFO Closing the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)

[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)

[2018-11-07 11:11:23,596] INFO Close ResourceManager connection cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new address null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)

[2018-11-07 11:11:23,607] INFO Stop job leader service. (org.apache.flink.runtime.taskexecutor.JobLeaderService:135)

[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)

 

 

 

De : LINZ, Arnaud
Envoyé : mardi 6 novembre 2018 14:23
À : user <[hidden email]>
Objet : Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

 

Hello,

 

In flink 1.3, I was able to make a clean stop of a HA streaming application just by ending the source “run()” method (with an ending condition).

I try to update my code to flink 1.6.2, but that is no longer working.

 

Even if there are no sources and no item to process, the cluster continue its execution forever, with an infinite number of such messages:

Checkpoint triggering task Source: Custom Source (1/2) of job 3b286f5344c50f0e466bb8ee79a2bb69 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.

 

Why has this behavior changed? How am I supposed to stop a streaming execution from its own code now? Is https://issues.apache.org/jira/browse/FLINK-2111 of any use?  

 

Thanks,

Arnaud

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

Fabian Hueske-2
Hi Arnaud,

Thanks for reporting the issue!

Best, Fabian

Am Do., 8. Nov. 2018 um 20:50 Uhr schrieb LINZ, Arnaud <[hidden email]>:

1.    FLINK-10832

Created (with heavy difficulties as typing java code in a jira description was an awful experience J)

 

 

De : LINZ, Arnaud
Envoyé : mercredi 7 novembre 2018 11:43
À : 'user' <[hidden email]>
Objet : RE: Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

 

FYI, the code below ends with version 1.6.0, do not end in 1.6.1. I suspect it’s a bug instead of a new feature.

 

De : LINZ, Arnaud
Envoyé : mercredi 7 novembre 2018 11:14
À : 'user' <[hidden email]>
Objet : RE: Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

 

Hello,

 

This has nothing to do with HA. All my unit tests involving a streaming app now fail in “infinite execution”

This simple code never ends :

    @Test

    public void testFlink162() throws Exception {

        // get the execution environment

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data

        final DataStreamSource<String> text = env.addSource(new SourceFunction<String>() {

            @Override

            public void run(final SourceContext<String> ctx) throws Exception {

                for (int count = 0; count < 5; count++) {

                    ctx.collect(String.valueOf(count));

                }

            }

            @Override

            public void cancel() {

            }

        });

        text.print().setParallelism(1);

        env.execute("Simple Test");

        // Never ends !

    }

Is this really a new feature or a critical bug?

In the log, the task executor is stopped

[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)

But execute() does not return.

 

Arnaud

 

Log is :

[2018-11-07 11:11:11,432] INFO Running job on local embedded Flink mini cluster (org.apache.flink.streaming.api.environment.LocalStreamEnvironment:114)

[2018-11-07 11:11:11,449] INFO Starting Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:227)

[2018-11-07 11:11:11,636] INFO Starting Metrics Registry (org.apache.flink.runtime.minicluster.MiniCluster:238)

[2018-11-07 11:11:11,652] INFO No metrics reporter configured, no metrics will be exposed/reported. (org.apache.flink.runtime.metrics.MetricRegistryImpl:113)

[2018-11-07 11:11:11,703] INFO Starting RPC Service(s) (org.apache.flink.runtime.minicluster.MiniCluster:249)

[2018-11-07 11:11:12,244] INFO Slf4jLogger started (akka.event.slf4j.Slf4jLogger:92)

[2018-11-07 11:11:12,264] INFO Starting high-availability services (org.apache.flink.runtime.minicluster.MiniCluster:290)

[2018-11-07 11:11:12,367] INFO Created BLOB server storage directory C:\Users\alinz\AppData\Local\Temp\blobStore-fd104a2d-caaf-4740-a762-d292cb2ed108 (org.apache.flink.runtime.blob.BlobServer:141)

[2018-11-07 11:11:12,379] INFO Started BLOB server at 0.0.0.0:64504 - max concurrent requests: 50 - max backlog: 1000 (org.apache.flink.runtime.blob.BlobServer:203)

[2018-11-07 11:11:12,380] INFO Starting ResourceManger (org.apache.flink.runtime.minicluster.MiniCluster:301)

[2018-11-07 11:11:12,409] INFO Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:12,432] INFO Proposing leadership to contender [hidden email] @ akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:12,439] INFO ResourceManager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 was granted leadership with fencing token 86394924fb97bad612b67f526f84406f (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:953)

[2018-11-07 11:11:12,440] INFO Starting the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:185)

[2018-11-07 11:11:12,442] INFO Received confirmation of leadership for leader akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 , session=12b67f52-6f84-406f-8639-4924fb97bad6 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:12,452] INFO Created BLOB cache storage directory C:\Users\alinz\AppData\Local\Temp\blobStore-b2618f73-5ec6-4fdf-ad43-1da6d6c19a4f (org.apache.flink.runtime.blob.PermanentBlobCache:107)

[2018-11-07 11:11:12,454] INFO Created BLOB cache storage directory C:\Users\alinz\AppData\Local\Temp\blobStore-df6c61d2-3c51-4335-a96e-6b00c82e4d90 (org.apache.flink.runtime.blob.TransientBlobCache:107)

[2018-11-07 11:11:12,454] INFO Starting 1 TaskManger(s) (org.apache.flink.runtime.minicluster.MiniCluster:316)

[2018-11-07 11:11:12,460] INFO Starting TaskManager with ResourceID: e84ce076-ec5e-48d6-90dc-4b18ba7c5757 (org.apache.flink.runtime.taskexecutor.TaskManagerRunner:352)

[2018-11-07 11:11:12,531] INFO Temporary file directory 'C:\Users\alinz\AppData\Local\Temp': total 476 GB, usable 149 GB (31,30% usable) (org.apache.flink.runtime.taskexecutor.TaskManagerServices:720)

[2018-11-07 11:11:12,757] INFO Allocated 396 MB for network buffer pool (number of memory segments: 12686, bytes per segment: 32768). (org.apache.flink.runtime.io.network.buffer.NetworkBufferPool:114)

[2018-11-07 11:11:12,765] INFO Could not load Queryable State Client Proxy. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder. (org.apache.flink.runtime.query.QueryableStateUtils:84)

[2018-11-07 11:11:12,765] INFO Could not load Queryable State Server. Probable reason: flink-queryable-state-runtime is not in the classpath. To enable Queryable State, please move the flink-queryable-state-runtime jar from the opt to the lib folder. (org.apache.flink.runtime.query.QueryableStateUtils:141)

[2018-11-07 11:11:12,766] INFO Starting the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:304)

[2018-11-07 11:11:12,768] WARN No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted. (org.apache.flink.runtime.taskmanager.TaskManagerLocation:102)

[2018-11-07 11:11:12,769] INFO Limiting managed memory to 0.7 of the currently free heap space (2493 MB), memory will be allocated lazily. (org.apache.flink.runtime.taskexecutor.TaskManagerServices:331)

[2018-11-07 11:11:12,776] INFO I/O manager uses directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 for spill files. (org.apache.flink.runtime.io.disk.iomanager.IOManager:95)

[2018-11-07 11:11:12,793] INFO Messages have a max timeout of 10000 ms (org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration:188)

[2018-11-07 11:11:12,803] INFO Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at akka://flink/user/taskmanager_0 . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:12,813] INFO Start job leader service. (org.apache.flink.runtime.taskexecutor.JobLeaderService:118)

[2018-11-07 11:11:12,814] INFO Connecting to ResourceManager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f). (org.apache.flink.runtime.taskexecutor.TaskExecutor:904)

[2018-11-07 11:11:12,814] INFO User file cache uses directory C:\Users\alinz\AppData\Local\Temp\flink-dist-cache-648ab4eb-f39c-4262-a3cc-07adfa6e5b43 (org.apache.flink.runtime.filecache.FileCache:107)

[2018-11-07 11:11:12,815] INFO Starting dispatcher rest endpoint. (org.apache.flink.runtime.minicluster.MiniCluster:327)

[2018-11-07 11:11:12,845] INFO Resolved ResourceManager address, beginning registration (org.apache.flink.runtime.taskexecutor.TaskExecutor:201)

[2018-11-07 11:11:12,846] INFO Registration at ResourceManager attempt 1 (timeout=100ms) (org.apache.flink.runtime.taskexecutor.TaskExecutor:250)

[2018-11-07 11:11:12,853] INFO Registering TaskManager with ResourceID e84ce076-ec5e-48d6-90dc-4b18ba7c5757 (akka://flink/user/taskmanager_0) at ResourceManager (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:727)

[2018-11-07 11:11:12,855] INFO Successful registration at resource manager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864 under registration id 8098d3fe3fe83133051c3bb97bf96d37. (org.apache.flink.runtime.taskexecutor.TaskExecutor:94)

[2018-11-07 11:11:12,877] INFO Starting rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:128)

[2018-11-07 11:11:13,168] WARN Log file environment variable 'log.file' is not set. (org.apache.flink.runtime.webmonitor.WebMonitorUtils:95)

[2018-11-07 11:11:13,169] WARN JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (deprecated keys: [jobmanager.web.log.path])'. (org.apache.flink.runtime.webmonitor.WebMonitorUtils:101)

[2018-11-07 11:11:13,195] INFO Failed to load web based job submission extension. Probable reason: flink-runtime-web is not in the classpath. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:115)

[2018-11-07 11:11:13,514] INFO Rest endpoint listening at localhost:64523 (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:200)

[2018-11-07 11:11:13,514] INFO Proposing leadership to contender [hidden email] @ http://localhost:64523 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:13,514] INFO Starting job dispatcher(s) for JobManger (org.apache.flink.runtime.minicluster.MiniCluster:364)

[2018-11-07 11:11:13,514] INFO http://localhost:64523 was granted leadership with leaderSessionID=940dc860-bbcb-4e35-8255-59702b220383 (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:757)

[2018-11-07 11:11:13,515] INFO Received confirmation of leadership for leader http://localhost:64523 , session=940dc860-bbcb-4e35-8255-59702b220383 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:13,523] INFO Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:13,537] INFO Proposing leadership to contender [hidden email] @ akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:13,538] INFO Flink Mini Cluster started successfully (org.apache.flink.runtime.minicluster.MiniCluster:410)

[2018-11-07 11:11:13,538] INFO Dispatcher akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a was granted leadership with fencing token d7ae33ab-20bb-4598-bc6d-b7a57ca66860 (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:818)

[2018-11-07 11:11:13,549] INFO Recovering all persisted jobs. (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:658)

[2018-11-07 11:11:13,550] INFO Received confirmation of leadership for leader akka://flink/user/dispatcher23a0d53f-9eab-495e-a398-b13b5993811a , session=d7ae33ab-20bb-4598-bc6d-b7a57ca66860 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:13,566] INFO Submitting job 0ef8697ca98f6d2b565ed928d17c8a49 (Simple Test). (org.apache.flink.runtime.dispatcher.StandaloneDispatcher:247)

[2018-11-07 11:11:13,592] INFO Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_1 . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:13,618] INFO Initializing job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). (org.apache.flink.runtime.jobmaster.JobMaster:269)

[2018-11-07 11:11:13,623] INFO Using restart strategy NoRestartStrategy for Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). (org.apache.flink.runtime.jobmaster.JobMaster:280)

[2018-11-07 11:11:13,629] INFO Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/96fe7c78-28fe-484f-ae16-dcd1d4bc2c6b . (org.apache.flink.runtime.rpc.akka.AkkaRpcService:224)

[2018-11-07 11:11:13,654] INFO Job recovers via failover strategy: full graph restart (org.apache.flink.runtime.executiongraph.ExecutionGraph:425)

[2018-11-07 11:11:13,689] INFO Running initialization on master for job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49). (org.apache.flink.runtime.jobmaster.JobMaster:195)

[2018-11-07 11:11:13,689] INFO Successfully ran initialization on master in 0 ms. (org.apache.flink.runtime.jobmaster.JobMaster:224)

[2018-11-07 11:11:13,722] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.runtime.jobmaster.JobMaster:230)

[2018-11-07 11:11:13,740] INFO Proposing leadership to contender [hidden email] @ akka://flink/user/jobmanager_1 (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:274)

[2018-11-07 11:11:13,740] INFO JobManager runner for job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) was granted leadership with session id 56e8324b-0015-4464-b6c7-ba0accdcec2a at akka://flink/user/jobmanager_1. (org.apache.flink.runtime.jobmaster.JobManagerRunner:329)

[2018-11-07 11:11:13,743] INFO Starting execution of job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) (org.apache.flink.runtime.jobmaster.JobMaster:1009)

[2018-11-07 11:11:13,744] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state CREATED to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)

[2018-11-07 11:11:13,747] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to SCHEDULED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,753] INFO Connecting to ResourceManager akka://flink/user/resourcemanager_f70e9071-b9d0-489a-9dcc-888f75b46864(86394924fb97bad612b67f526f84406f) (org.apache.flink.runtime.jobmaster.JobMaster:1285)

[2018-11-07 11:11:13,754] INFO Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=56e8324b-0015-4464-b6c7-ba0accdcec2a (org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedLeaderService:229)

[2018-11-07 11:11:13,757] INFO Resolved ResourceManager address, beginning registration (org.apache.flink.runtime.jobmaster.JobMaster:201)

[2018-11-07 11:11:13,758] INFO Registration at ResourceManager attempt 1 (timeout=100ms) (org.apache.flink.runtime.jobmaster.JobMaster:250)

[2018-11-07 11:11:13,763] INFO Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}] (org.apache.flink.runtime.jobmaster.slotpool.SlotPool:733)

[2018-11-07 11:11:13,767] INFO Registering job manager b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:298)

[2018-11-07 11:11:13,771] INFO Registered job manager b6c7ba0accdcec2a56e8324b00154464@akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:672)

[2018-11-07 11:11:13,772] INFO JobManager successfully registered at ResourceManager, leader id: 86394924fb97bad612b67f526f84406f. (org.apache.flink.runtime.jobmaster.JobMaster:1307)

[2018-11-07 11:11:13,773] INFO Requesting new slot [SlotRequestId{8e1e4cc71c946b4bf7c5d63f706796db}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager. (org.apache.flink.runtime.jobmaster.slotpool.SlotPool:689)

[2018-11-07 11:11:13,776] INFO Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 0ef8697ca98f6d2b565ed928d17c8a49 with allocation id AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. (org.apache.flink.runtime.resourcemanager.StandaloneResourceManager:422)

[2018-11-07 11:11:13,778] INFO Receive slot request AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a} for job 0ef8697ca98f6d2b565ed928d17c8a49 from resource manager with leader id 86394924fb97bad612b67f526f84406f. (org.apache.flink.runtime.taskexecutor.TaskExecutor:743)

[2018-11-07 11:11:13,779] INFO Allocated slot for AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. (org.apache.flink.runtime.taskexecutor.TaskExecutor:755)

[2018-11-07 11:11:13,779] INFO Add job 0ef8697ca98f6d2b565ed928d17c8a49 for job leader monitoring. (org.apache.flink.runtime.taskexecutor.JobLeaderService:186)

[2018-11-07 11:11:13,781] INFO Try to register at job manager akka://flink/user/jobmanager_1 with leader id 56e8324b-0015-4464-b6c7-ba0accdcec2a. (org.apache.flink.runtime.taskexecutor.JobLeaderService:326)

[2018-11-07 11:11:13,782] INFO Resolved JobManager address, beginning registration (org.apache.flink.runtime.taskexecutor.JobLeaderService:201)

[2018-11-07 11:11:13,782] INFO Registration at JobManager attempt 1 (timeout=100ms) (org.apache.flink.runtime.taskexecutor.JobLeaderService:250)

[2018-11-07 11:11:13,785] INFO Successful registration at job manager akka://flink/user/jobmanager_1 for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.taskexecutor.JobLeaderService:374)

[2018-11-07 11:11:13,786] INFO Establish JobManager connection for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1137)

[2018-11-07 11:11:13,789] INFO Offer reserved slots to the leader of job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1042)

[2018-11-07 11:11:13,794] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from SCHEDULED to DEPLOYING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,794] INFO Deploying Source: Custom Source -> Sink: Print to Std. Out (1/1) (attempt #0) to 127.0.0.1 (org.apache.flink.runtime.executiongraph.ExecutionGraph:576)

[2018-11-07 11:11:13,819] INFO Received task Source: Custom Source -> Sink: Print to Std. Out (1/1). (org.apache.flink.runtime.taskexecutor.TaskExecutor:541)

[2018-11-07 11:11:13,820] INFO Activate slot AllocationID{4d6ce3339a4c7bb811e9ba5e66a55b4a}. (org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable:237)

[2018-11-07 11:11:13,820] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from CREATED to DEPLOYING. (org.apache.flink.runtime.taskmanager.Task:915)

[2018-11-07 11:11:13,820] INFO Creating FileSystem stream leak safety net for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING] (org.apache.flink.runtime.taskmanager.Task:579)

[2018-11-07 11:11:13,828] INFO Loading JAR files for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:586)

[2018-11-07 11:11:13,829] INFO Registering task at network: Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [DEPLOYING]. (org.apache.flink.runtime.taskmanager.Task:612)

[2018-11-07 11:11:13,836] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.taskmanager.Task:915)

[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,840] INFO No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) (org.apache.flink.streaming.runtime.tasks.StreamTask:230)

0

1

2

3

4

[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)

[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). (org.apache.flink.runtime.taskmanager.Task:818)

[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed for task Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] (org.apache.flink.runtime.taskmanager.Task:845)

[2018-11-07 11:11:13,899] INFO Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)

[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)

[2018-11-07 11:11:13,907] INFO Job Simple Test (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)

[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 0ef8697ca98f6d2b565ed928d17c8a49. (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)

[2018-11-07 11:11:13,908] INFO Shutting down (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)

[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster (org.apache.flink.runtime.minicluster.MiniCluster:427)

[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)

[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)

[2018-11-07 11:11:23,583] INFO Shutting down TaskExecutorLocalStateStoresManager. (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)

[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)

[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)

[2018-11-07 11:11:23,591] INFO Removing cache directory C:\Users\alinz\AppData\Local\Temp\flink-web-ui (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)

[2018-11-07 11:11:23,593] INFO Closing the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)

[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)

[2018-11-07 11:11:23,596] INFO Close ResourceManager connection cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new address null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)

[2018-11-07 11:11:23,607] INFO Stop job leader service. (org.apache.flink.runtime.taskexecutor.JobLeaderService:135)

[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor akka://flink/user/taskmanager_0. (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)

 

 

 

De : LINZ, Arnaud
Envoyé : mardi 6 novembre 2018 14:23
À : user <[hidden email]>
Objet : Stopping a streaming app from its own code : behaviour change from 1.3 to 1.6

 

Hello,

 

In flink 1.3, I was able to make a clean stop of a HA streaming application just by ending the source “run()” method (with an ending condition).

I try to update my code to flink 1.6.2, but that is no longer working.

 

Even if there are no sources and no item to process, the cluster continue its execution forever, with an infinite number of such messages:

Checkpoint triggering task Source: Custom Source (1/2) of job 3b286f5344c50f0e466bb8ee79a2bb69 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.

 

Why has this behavior changed? How am I supposed to stop a streaming execution from its own code now? Is https://issues.apache.org/jira/browse/FLINK-2111 of any use?  

 

Thanks,

Arnaud

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.