Flink CLI distributed cache fault

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink CLI distributed cache fault

Vasyl Bervetskyi

Hi there,

 

I faced with issue in adding file to distributed cache in Flink.

My setup:

-          Java 1.8

-          Flink 1.8

-          OS: Windows, Linux

Test scenario:

1.       Create simple stream environment

2.       Add to distributed cache local file

3.       Add simple source function and sink

4.       Execute job from Flink CLI (Windows/Linux)

 

In order to restore job from savepoint or from checkpoint we need to run our job from Flink CLI. And pipelines that have distributed cache fails their execution.

Moreover it is different in Linux and Windows systems: in Windows we get “java.nio.file.InvalidPathException: Illegal char <:> at index 4” and on Linux we have our Flink freezing (it just stuck and do not do anything, no any error message or stacktrace).

 

My piece of code for windows environment:

 

 

public class CachePipeline {

   
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.registerCachedFile(
"file:///D:/test.csv", "MyFile");

        see.addSource(
new SourceFunction<Integer>() {

           
@Override
           
public void run(SourceContext<Integer> ctx) throws Exception {
               
while(true){
                   
synchronized(ctx.getCheckpointLock()){
                        ctx.collect(
5);
                    }
                    Thread.sleep(
1000);
                }
            }

           
@Override
           
public void cancel() {}

        }).print();

        see.execute();
    }
}

 

command for running job that I used for:

 

flink run -c test.CachePipeline D:\path\to\jar\cache-test.jar

 

 

In case with Linux OS I changed file location to:

see.registerCachedFile("file:///home/test.csv", "MyFile");

 

Windows stacktrace:

flink run -c com.CachePipeline D:\repository\cache-test.jar

 

log4j:WARN No appenders could be found for logger (org.apache.flink.client.cli.CliFrontend).

log4j:WARN Please initialize the log4j system properly.

log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

Starting execution of program

 

------------------------------------------------------------

The program finished with the following exception:

 

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result. (JobID: 38631d859b64cd86201bbe09a32c62f3)

        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)

        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)

        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)

        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)

        at com.granduke.teleprocessing.CachePipeline.main(CachePipeline.java:29)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)

        at java.lang.reflect.Method.invoke(Unknown Source)

        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)

        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)

        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)

        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)

        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)

        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)

        at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Unknown Source)

        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)

        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)

Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.

        at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)

        at java.util.concurrent.CompletableFuture.uniExceptionally(Unknown Source)

        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown Source)

        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)

        at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

        at java.lang.Thread.run(Unknown Source)

Caused by: java.nio.file.InvalidPathException: Illegal char <:> at index 4: file:/D:/test.csv

        at sun.nio.fs.WindowsPathParser.normalize(Unknown Source)

        at sun.nio.fs.WindowsPathParser.parse(Unknown Source)

        at sun.nio.fs.WindowsPathParser.parse(Unknown Source)

        at sun.nio.fs.WindowsPath.parse(Unknown Source)

        at sun.nio.fs.WindowsFileSystem.getPath(Unknown Source)

        at java.nio.file.Paths.get(Unknown Source)

        at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$3(RestClusterClient.java:353)

        at java.util.concurrent.CompletableFuture.uniApply(Unknown Source)

        at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)

        ... 5 more

 

1.     Also I run on different OS different veriosn of Flink, and have next results:

·         Flink 1.8.0 (Windows/Linux) - not working

·         Flink 1.7.2 (Windows/Linux) - not working

·         Flink 1.5.6 (Windows/Linux) - not working

·         Flink 1.4 (Windows/linux) - works as expected

 

Did anybody face with this?