Flink Table API and not recognizing s3 plugins

classic Classic list List threaded Threaded
17 messages Options
Dan
Reply | Threaded
Open this post in threaded view
|

Flink Table API and not recognizing s3 plugins

Dan
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar



Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Arvid Heise-3
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Dan
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Arvid Heise-3
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Dan
I changed the levels to DEBUG.  I don't see useful data in the logs.

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <[hidden email]> wrote:
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Arvid Heise-3
Hi Dan,

somehow enabling debug statements did not work.

However, the logs helps to narrow down the issue. The exception occurs neither on jobmanager nor on taskmanager. It occurs wherever you execute the command line interface.

How do you execute the job? Do you start it from your machine? Can you try out to also add the respective s3 plugin there?

Best,

Arvid

On Thu, Sep 10, 2020 at 7:50 PM Dan Hill <[hidden email]> wrote:
I changed the levels to DEBUG.  I don't see useful data in the logs.

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <[hidden email]> wrote:
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Dan
In reply to this post by Dan
I was able to get more info to output on jobmanager.

2020-09-10 19:50:17,722 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,731 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: blob.server.port, 6124

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.heap.size, 1g

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1g

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend, rocksdb

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, file:///flink_state/checkpoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.savepoints.dir, file:///flink_state/savepoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.async, true

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.memory-threshold, 1024

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.write-buffer-size, 4096

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.incremental, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.local-recovery, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.num-retained, 1

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.state.local.root-dirs, file:///flink_state/local-recovery

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.checkpoint.transfer.thread.num, 1

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.localdir, /flink_state/rocksdb

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.options-factory, org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.predefined-options, DEFAULT

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, HEAP

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.ttl.compaction.filter.enabled, false

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, flink-jobmanager

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.jvm-metaspace.size, 256mb

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.endpoint, http://minio:9000

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.path.style.access, true

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.access-key, YOURACCESSKEY

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.secret-key, ******

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.aws.credentials.provider, org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

2020-09-10 19:50:17,802 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Loading FallbackYarnSessionCli

2020-09-10 19:50:17,929 INFO  org.apache.flink.core.fs.FileSystem                          [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.

2020-09-10 19:50:18,102 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory [] - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,126 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-1506212733867615019.conf.

2020-09-10 19:50:18,161 INFO  org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,163 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Running 'list' command.

2020-09-10 19:50:18,226 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.

2020-09-10 19:50:19,107 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Waiting for response...

2020-09-10 19:50:19,414 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Successfully retrieved list of jobs


On Thu, Sep 10, 2020 at 10:50 AM Dan Hill <[hidden email]> wrote:
I changed the levels to DEBUG.  I don't see useful data in the logs.

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <[hidden email]> wrote:
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Dan
In reply to this post by Arvid Heise-3
Copying more of the log

2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  OS current user: root

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.265-b01

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Maximum heap size: 2167 MiBytes

tail: log/flink--client-flink-jobmanager-0.log: file truncated

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JAVA_HOME: /usr/local/openjdk-8

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  No Hadoop Dependency available

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM Options:

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Djava.security.properties=/opt/flink/conf/security.properties

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Program Arguments:

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     list

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     --jobmanager

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     localhost:8081

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Classpath: /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar:::

2020-09-10 19:50:17,722 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,731 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: blob.server.port, 6124

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.heap.size, 1g

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1g

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend, rocksdb

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, file:///flink_state/checkpoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.savepoints.dir, file:///flink_state/savepoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.async, true

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.memory-threshold, 1024

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.write-buffer-size, 4096

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.incremental, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.local-recovery, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.num-retained, 1

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.state.local.root-dirs, file:///flink_state/local-recovery

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.checkpoint.transfer.thread.num, 1

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.localdir, /flink_state/rocksdb

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.options-factory, org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.predefined-options, DEFAULT

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, HEAP

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.ttl.compaction.filter.enabled, false

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, flink-jobmanager

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.jvm-metaspace.size, 256mb

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.endpoint, http://minio:9000

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.path.style.access, true

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.access-key, YOURACCESSKEY

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.secret-key, ******

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.aws.credentials.provider, org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

2020-09-10 19:50:17,802 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Loading FallbackYarnSessionCli

2020-09-10 19:50:17,929 INFO  org.apache.flink.core.fs.FileSystem                          [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.

2020-09-10 19:50:18,102 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory [] - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,126 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-1506212733867615019.conf.

2020-09-10 19:50:18,161 INFO  org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,163 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Running 'list' command.

2020-09-10 19:50:18,226 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.

2020-09-10 19:50:19,107 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Waiting for response...

2020-09-10 19:50:19,414 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Successfully retrieved list of jobs



On Thu, Sep 10, 2020 at 1:02 PM Arvid Heise <[hidden email]> wrote:
Hi Dan,

somehow enabling debug statements did not work.

However, the logs helps to narrow down the issue. The exception occurs neither on jobmanager nor on taskmanager. It occurs wherever you execute the command line interface.

How do you execute the job? Do you start it from your machine? Can you try out to also add the respective s3 plugin there?

Best,

Arvid

On Thu, Sep 10, 2020 at 7:50 PM Dan Hill <[hidden email]> wrote:
I changed the levels to DEBUG.  I don't see useful data in the logs.

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <[hidden email]> wrote:
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Dan
I'm using this Helm chart.  I start the job by building an image with the job jar and using kubectl apply to do a flink run with the jar.

The log4j.properties on jobmanager and taskmanager have debug level set and are pretty embedded into the Helm chart.  My log4j-cli.properties is hacked on the CLI side.

I thought I just needed the s3 plugins in the jobmanager and taskmanager.  Do I need to have a similar plugin structure from the image where I run 'flink run'?


On Thu, Sep 10, 2020 at 1:03 PM Dan Hill <[hidden email]> wrote:
Copying more of the log

2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  OS current user: root

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.265-b01

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Maximum heap size: 2167 MiBytes

tail: log/flink--client-flink-jobmanager-0.log: file truncated

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JAVA_HOME: /usr/local/openjdk-8

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  No Hadoop Dependency available

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM Options:

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Djava.security.properties=/opt/flink/conf/security.properties

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Program Arguments:

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     list

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     --jobmanager

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     localhost:8081

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Classpath: /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar:::

2020-09-10 19:50:17,722 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,731 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: blob.server.port, 6124

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.heap.size, 1g

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1g

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend, rocksdb

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, file:///flink_state/checkpoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.savepoints.dir, file:///flink_state/savepoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.async, true

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.memory-threshold, 1024

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.write-buffer-size, 4096

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.incremental, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.local-recovery, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.num-retained, 1

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.state.local.root-dirs, file:///flink_state/local-recovery

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.checkpoint.transfer.thread.num, 1

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.localdir, /flink_state/rocksdb

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.options-factory, org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.predefined-options, DEFAULT

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, HEAP

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.ttl.compaction.filter.enabled, false

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, flink-jobmanager

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.jvm-metaspace.size, 256mb

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.endpoint, http://minio:9000

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.path.style.access, true

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.access-key, YOURACCESSKEY

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.secret-key, ******

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.aws.credentials.provider, org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

2020-09-10 19:50:17,802 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Loading FallbackYarnSessionCli

2020-09-10 19:50:17,929 INFO  org.apache.flink.core.fs.FileSystem                          [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.

2020-09-10 19:50:18,102 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory [] - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,126 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-1506212733867615019.conf.

2020-09-10 19:50:18,161 INFO  org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,163 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Running 'list' command.

2020-09-10 19:50:18,226 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.

2020-09-10 19:50:19,107 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Waiting for response...

2020-09-10 19:50:19,414 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Successfully retrieved list of jobs



On Thu, Sep 10, 2020 at 1:02 PM Arvid Heise <[hidden email]> wrote:
Hi Dan,

somehow enabling debug statements did not work.

However, the logs helps to narrow down the issue. The exception occurs neither on jobmanager nor on taskmanager. It occurs wherever you execute the command line interface.

How do you execute the job? Do you start it from your machine? Can you try out to also add the respective s3 plugin there?

Best,

Arvid

On Thu, Sep 10, 2020 at 7:50 PM Dan Hill <[hidden email]> wrote:
I changed the levels to DEBUG.  I don't see useful data in the logs.

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <[hidden email]> wrote:
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Arvid Heise-3
In general, I'd assume that JM and TM are enough.

However, it seems like the query planner is doing some path sanitization for which it needs the filesystem. Since I don't know this part too well, I'm pulling in Jark and Dawid that may know more.

I'm also not sure if this is intentional or a bug. It might be worth documenting in the former case.

On Thu, Sep 10, 2020 at 10:10 PM Dan Hill <[hidden email]> wrote:
I'm using this Helm chart.  I start the job by building an image with the job jar and using kubectl apply to do a flink run with the jar.

The log4j.properties on jobmanager and taskmanager have debug level set and are pretty embedded into the Helm chart.  My log4j-cli.properties is hacked on the CLI side.

I thought I just needed the s3 plugins in the jobmanager and taskmanager.  Do I need to have a similar plugin structure from the image where I run 'flink run'?


On Thu, Sep 10, 2020 at 1:03 PM Dan Hill <[hidden email]> wrote:
Copying more of the log

2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  OS current user: root

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.265-b01

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Maximum heap size: 2167 MiBytes

tail: log/flink--client-flink-jobmanager-0.log: file truncated

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JAVA_HOME: /usr/local/openjdk-8

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  No Hadoop Dependency available

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM Options:

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Djava.security.properties=/opt/flink/conf/security.properties

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Program Arguments:

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     list

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     --jobmanager

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     localhost:8081

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Classpath: /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar:::

2020-09-10 19:50:17,722 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,731 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: blob.server.port, 6124

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.heap.size, 1g

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1g

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend, rocksdb

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, file:///flink_state/checkpoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.savepoints.dir, file:///flink_state/savepoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.async, true

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.memory-threshold, 1024

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.write-buffer-size, 4096

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.incremental, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.local-recovery, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.num-retained, 1

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.state.local.root-dirs, file:///flink_state/local-recovery

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.checkpoint.transfer.thread.num, 1

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.localdir, /flink_state/rocksdb

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.options-factory, org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.predefined-options, DEFAULT

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, HEAP

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.ttl.compaction.filter.enabled, false

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, flink-jobmanager

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.jvm-metaspace.size, 256mb

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.endpoint, http://minio:9000

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.path.style.access, true

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.access-key, YOURACCESSKEY

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.secret-key, ******

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.aws.credentials.provider, org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

2020-09-10 19:50:17,802 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Loading FallbackYarnSessionCli

2020-09-10 19:50:17,929 INFO  org.apache.flink.core.fs.FileSystem                          [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.

2020-09-10 19:50:18,102 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory [] - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,126 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-1506212733867615019.conf.

2020-09-10 19:50:18,161 INFO  org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,163 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Running 'list' command.

2020-09-10 19:50:18,226 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.

2020-09-10 19:50:19,107 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Waiting for response...

2020-09-10 19:50:19,414 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Successfully retrieved list of jobs



On Thu, Sep 10, 2020 at 1:02 PM Arvid Heise <[hidden email]> wrote:
Hi Dan,

somehow enabling debug statements did not work.

However, the logs helps to narrow down the issue. The exception occurs neither on jobmanager nor on taskmanager. It occurs wherever you execute the command line interface.

How do you execute the job? Do you start it from your machine? Can you try out to also add the respective s3 plugin there?

Best,

Arvid

On Thu, Sep 10, 2020 at 7:50 PM Dan Hill <[hidden email]> wrote:
I changed the levels to DEBUG.  I don't see useful data in the logs.

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <[hidden email]> wrote:
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Dan
In reply to this post by Dan
This is running on my local minikube and is trying to hit minio.

On Thu, Sep 10, 2020 at 1:10 PM Dan Hill <[hidden email]> wrote:
I'm using this Helm chart.  I start the job by building an image with the job jar and using kubectl apply to do a flink run with the jar.

The log4j.properties on jobmanager and taskmanager have debug level set and are pretty embedded into the Helm chart.  My log4j-cli.properties is hacked on the CLI side.

I thought I just needed the s3 plugins in the jobmanager and taskmanager.  Do I need to have a similar plugin structure from the image where I run 'flink run'?


On Thu, Sep 10, 2020 at 1:03 PM Dan Hill <[hidden email]> wrote:
Copying more of the log

2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  OS current user: root

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.265-b01

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Maximum heap size: 2167 MiBytes

tail: log/flink--client-flink-jobmanager-0.log: file truncated

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JAVA_HOME: /usr/local/openjdk-8

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  No Hadoop Dependency available

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM Options:

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Djava.security.properties=/opt/flink/conf/security.properties

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Program Arguments:

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     list

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     --jobmanager

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     localhost:8081

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Classpath: /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar:::

2020-09-10 19:50:17,722 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,731 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: blob.server.port, 6124

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.heap.size, 1g

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1g

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend, rocksdb

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, file:///flink_state/checkpoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.savepoints.dir, file:///flink_state/savepoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.async, true

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.memory-threshold, 1024

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.write-buffer-size, 4096

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.incremental, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.local-recovery, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.num-retained, 1

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.state.local.root-dirs, file:///flink_state/local-recovery

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.checkpoint.transfer.thread.num, 1

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.localdir, /flink_state/rocksdb

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.options-factory, org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.predefined-options, DEFAULT

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, HEAP

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.ttl.compaction.filter.enabled, false

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, flink-jobmanager

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.jvm-metaspace.size, 256mb

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.endpoint, http://minio:9000

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.path.style.access, true

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.access-key, YOURACCESSKEY

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.secret-key, ******

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.aws.credentials.provider, org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

2020-09-10 19:50:17,802 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Loading FallbackYarnSessionCli

2020-09-10 19:50:17,929 INFO  org.apache.flink.core.fs.FileSystem                          [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.

2020-09-10 19:50:18,102 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory [] - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,126 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-1506212733867615019.conf.

2020-09-10 19:50:18,161 INFO  org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,163 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Running 'list' command.

2020-09-10 19:50:18,226 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.

2020-09-10 19:50:19,107 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Waiting for response...

2020-09-10 19:50:19,414 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Successfully retrieved list of jobs



On Thu, Sep 10, 2020 at 1:02 PM Arvid Heise <[hidden email]> wrote:
Hi Dan,

somehow enabling debug statements did not work.

However, the logs helps to narrow down the issue. The exception occurs neither on jobmanager nor on taskmanager. It occurs wherever you execute the command line interface.

How do you execute the job? Do you start it from your machine? Can you try out to also add the respective s3 plugin there?

Best,

Arvid

On Thu, Sep 10, 2020 at 7:50 PM Dan Hill <[hidden email]> wrote:
I changed the levels to DEBUG.  I don't see useful data in the logs.

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <[hidden email]> wrote:
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Dawid Wysakowicz-2

Hi Dan,

As far as I checked in the code, the FileSystemSink will try to create staging directories from the client. I think it might be problematic, as your case shows. We might need to revisit that part. I am cc'ing Jingsong who worked on the FileSystemSink.

As a workaround you might try putting the s3 plugin on the CLI classpath (not sure if plugins work for the CLI through the /plugins directory).

Best,

Dawid

On 10/09/2020 22:13, Dan Hill wrote:
This is running on my local minikube and is trying to hit minio.

On Thu, Sep 10, 2020 at 1:10 PM Dan Hill <[hidden email]> wrote:
I'm using this Helm chart.  I start the job by building an image with the job jar and using kubectl apply to do a flink run with the jar.

The log4j.properties on jobmanager and taskmanager have debug level set and are pretty embedded into the Helm chart.  My log4j-cli.properties is hacked on the CLI side.

I thought I just needed the s3 plugins in the jobmanager and taskmanager.  Do I need to have a similar plugin structure from the image where I run 'flink run'?


On Thu, Sep 10, 2020 at 1:03 PM Dan Hill <[hidden email]> wrote:
Copying more of the log

2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  OS current user: root

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.265-b01

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Maximum heap size: 2167 MiBytes

tail: log/flink--client-flink-jobmanager-0.log: file truncated

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JAVA_HOME: /usr/local/openjdk-8

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  No Hadoop Dependency available

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM Options:

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Djava.security.properties=/opt/flink/conf/security.properties

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Program Arguments:

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     list

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     --jobmanager

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     localhost:8081

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Classpath: /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar:::

2020-09-10 19:50:17,722 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,731 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: blob.server.port, 6124

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.heap.size, 1g

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1g

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend, rocksdb

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, file:///flink_state/checkpoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.savepoints.dir, file:///flink_state/savepoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.async, true

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.memory-threshold, 1024

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.write-buffer-size, 4096

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.incremental, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.local-recovery, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.num-retained, 1

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.state.local.root-dirs, file:///flink_state/local-recovery

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.checkpoint.transfer.thread.num, 1

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.localdir, /flink_state/rocksdb

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.options-factory, org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.predefined-options, DEFAULT

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, HEAP

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.ttl.compaction.filter.enabled, false

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, flink-jobmanager

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.jvm-metaspace.size, 256mb

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.endpoint, http://minio:9000

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.path.style.access, true

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.access-key, YOURACCESSKEY

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.secret-key, ******

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.aws.credentials.provider, org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

2020-09-10 19:50:17,802 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Loading FallbackYarnSessionCli

2020-09-10 19:50:17,929 INFO  org.apache.flink.core.fs.FileSystem                          [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.

2020-09-10 19:50:18,102 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory [] - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,126 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-1506212733867615019.conf.

2020-09-10 19:50:18,161 INFO  org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,163 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Running 'list' command.

2020-09-10 19:50:18,226 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.

2020-09-10 19:50:19,107 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Waiting for response...

2020-09-10 19:50:19,414 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Successfully retrieved list of jobs



On Thu, Sep 10, 2020 at 1:02 PM Arvid Heise <[hidden email]> wrote:
Hi Dan,

somehow enabling debug statements did not work.

However, the logs helps to narrow down the issue. The exception occurs neither on jobmanager nor on taskmanager. It occurs wherever you execute the command line interface.

How do you execute the job? Do you start it from your machine? Can you try out to also add the respective s3 plugin there?

Best,

Arvid

On Thu, Sep 10, 2020 at 7:50 PM Dan Hill <[hidden email]> wrote:
I changed the levels to DEBUG.  I don't see useful data in the logs.

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <[hidden email]> wrote:
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Jingsong Li
Hi Dan,

I think Arvid and Dawid are right, as a workaround, you can try making S3Filesystem works in the client. But for a long term solution, we can fix it.


Best,
Jingsong 

On Mon, Sep 14, 2020 at 3:57 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Dan,

As far as I checked in the code, the FileSystemSink will try to create staging directories from the client. I think it might be problematic, as your case shows. We might need to revisit that part. I am cc'ing Jingsong who worked on the FileSystemSink.

As a workaround you might try putting the s3 plugin on the CLI classpath (not sure if plugins work for the CLI through the /plugins directory).

Best,

Dawid

On 10/09/2020 22:13, Dan Hill wrote:
This is running on my local minikube and is trying to hit minio.

On Thu, Sep 10, 2020 at 1:10 PM Dan Hill <[hidden email]> wrote:
I'm using this Helm chart.  I start the job by building an image with the job jar and using kubectl apply to do a flink run with the jar.

The log4j.properties on jobmanager and taskmanager have debug level set and are pretty embedded into the Helm chart.  My log4j-cli.properties is hacked on the CLI side.

I thought I just needed the s3 plugins in the jobmanager and taskmanager.  Do I need to have a similar plugin structure from the image where I run 'flink run'?


On Thu, Sep 10, 2020 at 1:03 PM Dan Hill <[hidden email]> wrote:
Copying more of the log

2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  OS current user: root

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.265-b01

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Maximum heap size: 2167 MiBytes

tail: log/flink--client-flink-jobmanager-0.log: file truncated

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JAVA_HOME: /usr/local/openjdk-8

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  No Hadoop Dependency available

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM Options:

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Djava.security.properties=/opt/flink/conf/security.properties

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Program Arguments:

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     list

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     --jobmanager

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     localhost:8081

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Classpath: /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar:::

2020-09-10 19:50:17,722 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,731 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: blob.server.port, 6124

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.heap.size, 1g

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1g

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend, rocksdb

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, file:///flink_state/checkpoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.savepoints.dir, file:///flink_state/savepoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.async, true

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.memory-threshold, 1024

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.write-buffer-size, 4096

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.incremental, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.local-recovery, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.num-retained, 1

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.state.local.root-dirs, file:///flink_state/local-recovery

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.checkpoint.transfer.thread.num, 1

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.localdir, /flink_state/rocksdb

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.options-factory, org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.predefined-options, DEFAULT

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, HEAP

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.ttl.compaction.filter.enabled, false

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, flink-jobmanager

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.jvm-metaspace.size, 256mb

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.endpoint, http://minio:9000

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.path.style.access, true

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.access-key, YOURACCESSKEY

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.secret-key, ******

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.aws.credentials.provider, org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

2020-09-10 19:50:17,802 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Loading FallbackYarnSessionCli

2020-09-10 19:50:17,929 INFO  org.apache.flink.core.fs.FileSystem                          [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.

2020-09-10 19:50:18,102 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory [] - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,126 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-1506212733867615019.conf.

2020-09-10 19:50:18,161 INFO  org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,163 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Running 'list' command.

2020-09-10 19:50:18,226 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.

2020-09-10 19:50:19,107 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Waiting for response...

2020-09-10 19:50:19,414 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Successfully retrieved list of jobs



On Thu, Sep 10, 2020 at 1:02 PM Arvid Heise <[hidden email]> wrote:
Hi Dan,

somehow enabling debug statements did not work.

However, the logs helps to narrow down the issue. The exception occurs neither on jobmanager nor on taskmanager. It occurs wherever you execute the command line interface.

How do you execute the job? Do you start it from your machine? Can you try out to also add the respective s3 plugin there?

Best,

Arvid

On Thu, Sep 10, 2020 at 7:50 PM Dan Hill <[hidden email]> wrote:
I changed the levels to DEBUG.  I don't see useful data in the logs.

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <[hidden email]> wrote:
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Best, Jingsong Lee
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Dan
Thanks for the update! 

I'm trying a bunch of combinations on the client side to get the S3 Filesystem to be picked up correctly.  Most of my attempts involved building into the job jar (which I'm guessing won't work).  I then started getting issues with ClassCastExceptions.

I might try a little more tomorrow (e.g. modifying the custom image).  If I can't get it, I'll roll back to a previous Flink version that works.

Caused by: java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory

at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)

at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)

at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:431)

... 51 more



On Mon, Sep 14, 2020 at 7:03 PM Jingsong Li <[hidden email]> wrote:
Hi Dan,

I think Arvid and Dawid are right, as a workaround, you can try making S3Filesystem works in the client. But for a long term solution, we can fix it.


Best,
Jingsong 

On Mon, Sep 14, 2020 at 3:57 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Dan,

As far as I checked in the code, the FileSystemSink will try to create staging directories from the client. I think it might be problematic, as your case shows. We might need to revisit that part. I am cc'ing Jingsong who worked on the FileSystemSink.

As a workaround you might try putting the s3 plugin on the CLI classpath (not sure if plugins work for the CLI through the /plugins directory).

Best,

Dawid

On 10/09/2020 22:13, Dan Hill wrote:
This is running on my local minikube and is trying to hit minio.

On Thu, Sep 10, 2020 at 1:10 PM Dan Hill <[hidden email]> wrote:
I'm using this Helm chart.  I start the job by building an image with the job jar and using kubectl apply to do a flink run with the jar.

The log4j.properties on jobmanager and taskmanager have debug level set and are pretty embedded into the Helm chart.  My log4j-cli.properties is hacked on the CLI side.

I thought I just needed the s3 plugins in the jobmanager and taskmanager.  Do I need to have a similar plugin structure from the image where I run 'flink run'?


On Thu, Sep 10, 2020 at 1:03 PM Dan Hill <[hidden email]> wrote:
Copying more of the log

2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  OS current user: root

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.265-b01

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Maximum heap size: 2167 MiBytes

tail: log/flink--client-flink-jobmanager-0.log: file truncated

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JAVA_HOME: /usr/local/openjdk-8

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  No Hadoop Dependency available

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM Options:

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Djava.security.properties=/opt/flink/conf/security.properties

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Program Arguments:

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     list

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     --jobmanager

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     localhost:8081

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Classpath: /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar:::

2020-09-10 19:50:17,722 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,731 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: blob.server.port, 6124

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.heap.size, 1g

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1g

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend, rocksdb

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, file:///flink_state/checkpoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.savepoints.dir, file:///flink_state/savepoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.async, true

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.memory-threshold, 1024

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.write-buffer-size, 4096

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.incremental, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.local-recovery, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.num-retained, 1

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.state.local.root-dirs, file:///flink_state/local-recovery

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.checkpoint.transfer.thread.num, 1

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.localdir, /flink_state/rocksdb

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.options-factory, org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.predefined-options, DEFAULT

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, HEAP

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.ttl.compaction.filter.enabled, false

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, flink-jobmanager

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.jvm-metaspace.size, 256mb

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.endpoint, http://minio:9000

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.path.style.access, true

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.access-key, YOURACCESSKEY

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.secret-key, ******

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.aws.credentials.provider, org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

2020-09-10 19:50:17,802 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Loading FallbackYarnSessionCli

2020-09-10 19:50:17,929 INFO  org.apache.flink.core.fs.FileSystem                          [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.

2020-09-10 19:50:18,102 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory [] - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,126 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-1506212733867615019.conf.

2020-09-10 19:50:18,161 INFO  org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,163 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Running 'list' command.

2020-09-10 19:50:18,226 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.

2020-09-10 19:50:19,107 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Waiting for response...

2020-09-10 19:50:19,414 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Successfully retrieved list of jobs



On Thu, Sep 10, 2020 at 1:02 PM Arvid Heise <[hidden email]> wrote:
Hi Dan,

somehow enabling debug statements did not work.

However, the logs helps to narrow down the issue. The exception occurs neither on jobmanager nor on taskmanager. It occurs wherever you execute the command line interface.

How do you execute the job? Do you start it from your machine? Can you try out to also add the respective s3 plugin there?

Best,

Arvid

On Thu, Sep 10, 2020 at 7:50 PM Dan Hill <[hidden email]> wrote:
I changed the levels to DEBUG.  I don't see useful data in the logs.

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <[hidden email]> wrote:
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Arvid Heise-3
Hi Dan,

Are you running the client also in K8s? If so you need an initialization step, where you add the library to the plugins directory. Putting it into lib or into the user jar doesn't work anymore as we removed the shading in s3 in Flink 1.10.

The official Flink docker image has an easy way to add these plugins [1].


On Tue, Sep 15, 2020 at 6:40 AM Dan Hill <[hidden email]> wrote:
Thanks for the update! 

I'm trying a bunch of combinations on the client side to get the S3 Filesystem to be picked up correctly.  Most of my attempts involved building into the job jar (which I'm guessing won't work).  I then started getting issues with ClassCastExceptions.

I might try a little more tomorrow (e.g. modifying the custom image).  If I can't get it, I'll roll back to a previous Flink version that works.

Caused by: java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory

at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)

at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)

at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:431)

... 51 more



On Mon, Sep 14, 2020 at 7:03 PM Jingsong Li <[hidden email]> wrote:
Hi Dan,

I think Arvid and Dawid are right, as a workaround, you can try making S3Filesystem works in the client. But for a long term solution, we can fix it.


Best,
Jingsong 

On Mon, Sep 14, 2020 at 3:57 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Dan,

As far as I checked in the code, the FileSystemSink will try to create staging directories from the client. I think it might be problematic, as your case shows. We might need to revisit that part. I am cc'ing Jingsong who worked on the FileSystemSink.

As a workaround you might try putting the s3 plugin on the CLI classpath (not sure if plugins work for the CLI through the /plugins directory).

Best,

Dawid

On 10/09/2020 22:13, Dan Hill wrote:
This is running on my local minikube and is trying to hit minio.

On Thu, Sep 10, 2020 at 1:10 PM Dan Hill <[hidden email]> wrote:
I'm using this Helm chart.  I start the job by building an image with the job jar and using kubectl apply to do a flink run with the jar.

The log4j.properties on jobmanager and taskmanager have debug level set and are pretty embedded into the Helm chart.  My log4j-cli.properties is hacked on the CLI side.

I thought I just needed the s3 plugins in the jobmanager and taskmanager.  Do I need to have a similar plugin structure from the image where I run 'flink run'?


On Thu, Sep 10, 2020 at 1:03 PM Dan Hill <[hidden email]> wrote:
Copying more of the log

2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  OS current user: root

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.265-b01

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Maximum heap size: 2167 MiBytes

tail: log/flink--client-flink-jobmanager-0.log: file truncated

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JAVA_HOME: /usr/local/openjdk-8

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  No Hadoop Dependency available

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM Options:

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Djava.security.properties=/opt/flink/conf/security.properties

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Program Arguments:

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     list

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     --jobmanager

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     localhost:8081

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Classpath: /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar:::

2020-09-10 19:50:17,722 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,731 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: blob.server.port, 6124

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.heap.size, 1g

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1g

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend, rocksdb

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, file:///flink_state/checkpoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.savepoints.dir, file:///flink_state/savepoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.async, true

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.memory-threshold, 1024

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.write-buffer-size, 4096

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.incremental, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.local-recovery, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.num-retained, 1

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.state.local.root-dirs, file:///flink_state/local-recovery

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.checkpoint.transfer.thread.num, 1

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.localdir, /flink_state/rocksdb

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.options-factory, org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.predefined-options, DEFAULT

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, HEAP

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.ttl.compaction.filter.enabled, false

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, flink-jobmanager

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.jvm-metaspace.size, 256mb

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.endpoint, http://minio:9000

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.path.style.access, true

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.access-key, YOURACCESSKEY

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.secret-key, ******

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.aws.credentials.provider, org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

2020-09-10 19:50:17,802 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Loading FallbackYarnSessionCli

2020-09-10 19:50:17,929 INFO  org.apache.flink.core.fs.FileSystem                          [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.

2020-09-10 19:50:18,102 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory [] - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,126 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-1506212733867615019.conf.

2020-09-10 19:50:18,161 INFO  org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,163 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Running 'list' command.

2020-09-10 19:50:18,226 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.

2020-09-10 19:50:19,107 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Waiting for response...

2020-09-10 19:50:19,414 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Successfully retrieved list of jobs



On Thu, Sep 10, 2020 at 1:02 PM Arvid Heise <[hidden email]> wrote:
Hi Dan,

somehow enabling debug statements did not work.

However, the logs helps to narrow down the issue. The exception occurs neither on jobmanager nor on taskmanager. It occurs wherever you execute the command line interface.

How do you execute the job? Do you start it from your machine? Can you try out to also add the respective s3 plugin there?

Best,

Arvid

On Thu, Sep 10, 2020 at 7:50 PM Dan Hill <[hidden email]> wrote:
I changed the levels to DEBUG.  I don't see useful data in the logs.

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <[hidden email]> wrote:
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Best, Jingsong Lee


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Dan
Yes, the client runs in K8.  It uses a different K8 config than the Helm chart and does not load the plugins.  Does the client use the same plugin structure as the Flink job/task manager?  I can try using it tomorrow.

Cool, that link would work too.

Thanks, Arvid!


On Mon, Sep 14, 2020 at 10:59 PM Arvid Heise <[hidden email]> wrote:
Hi Dan,

Are you running the client also in K8s? If so you need an initialization step, where you add the library to the plugins directory. Putting it into lib or into the user jar doesn't work anymore as we removed the shading in s3 in Flink 1.10.

The official Flink docker image has an easy way to add these plugins [1].


On Tue, Sep 15, 2020 at 6:40 AM Dan Hill <[hidden email]> wrote:
Thanks for the update! 

I'm trying a bunch of combinations on the client side to get the S3 Filesystem to be picked up correctly.  Most of my attempts involved building into the job jar (which I'm guessing won't work).  I then started getting issues with ClassCastExceptions.

I might try a little more tomorrow (e.g. modifying the custom image).  If I can't get it, I'll roll back to a previous Flink version that works.

Caused by: java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory

at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)

at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)

at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:431)

... 51 more



On Mon, Sep 14, 2020 at 7:03 PM Jingsong Li <[hidden email]> wrote:
Hi Dan,

I think Arvid and Dawid are right, as a workaround, you can try making S3Filesystem works in the client. But for a long term solution, we can fix it.


Best,
Jingsong 

On Mon, Sep 14, 2020 at 3:57 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Dan,

As far as I checked in the code, the FileSystemSink will try to create staging directories from the client. I think it might be problematic, as your case shows. We might need to revisit that part. I am cc'ing Jingsong who worked on the FileSystemSink.

As a workaround you might try putting the s3 plugin on the CLI classpath (not sure if plugins work for the CLI through the /plugins directory).

Best,

Dawid

On 10/09/2020 22:13, Dan Hill wrote:
This is running on my local minikube and is trying to hit minio.

On Thu, Sep 10, 2020 at 1:10 PM Dan Hill <[hidden email]> wrote:
I'm using this Helm chart.  I start the job by building an image with the job jar and using kubectl apply to do a flink run with the jar.

The log4j.properties on jobmanager and taskmanager have debug level set and are pretty embedded into the Helm chart.  My log4j-cli.properties is hacked on the CLI side.

I thought I just needed the s3 plugins in the jobmanager and taskmanager.  Do I need to have a similar plugin structure from the image where I run 'flink run'?


On Thu, Sep 10, 2020 at 1:03 PM Dan Hill <[hidden email]> wrote:
Copying more of the log

2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  OS current user: root

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.265-b01

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Maximum heap size: 2167 MiBytes

tail: log/flink--client-flink-jobmanager-0.log: file truncated

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JAVA_HOME: /usr/local/openjdk-8

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  No Hadoop Dependency available

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM Options:

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Djava.security.properties=/opt/flink/conf/security.properties

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Program Arguments:

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     list

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     --jobmanager

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     localhost:8081

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Classpath: /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar:::

2020-09-10 19:50:17,722 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,731 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: blob.server.port, 6124

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.heap.size, 1g

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1g

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend, rocksdb

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, file:///flink_state/checkpoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.savepoints.dir, file:///flink_state/savepoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.async, true

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.memory-threshold, 1024

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.write-buffer-size, 4096

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.incremental, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.local-recovery, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.num-retained, 1

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.state.local.root-dirs, file:///flink_state/local-recovery

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.checkpoint.transfer.thread.num, 1

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.localdir, /flink_state/rocksdb

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.options-factory, org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.predefined-options, DEFAULT

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, HEAP

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.ttl.compaction.filter.enabled, false

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, flink-jobmanager

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.jvm-metaspace.size, 256mb

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.endpoint, http://minio:9000

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.path.style.access, true

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.access-key, YOURACCESSKEY

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.secret-key, ******

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.aws.credentials.provider, org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

2020-09-10 19:50:17,802 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Loading FallbackYarnSessionCli

2020-09-10 19:50:17,929 INFO  org.apache.flink.core.fs.FileSystem                          [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.

2020-09-10 19:50:18,102 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory [] - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,126 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-1506212733867615019.conf.

2020-09-10 19:50:18,161 INFO  org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,163 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Running 'list' command.

2020-09-10 19:50:18,226 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.

2020-09-10 19:50:19,107 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Waiting for response...

2020-09-10 19:50:19,414 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Successfully retrieved list of jobs



On Thu, Sep 10, 2020 at 1:02 PM Arvid Heise <[hidden email]> wrote:
Hi Dan,

somehow enabling debug statements did not work.

However, the logs helps to narrow down the issue. The exception occurs neither on jobmanager nor on taskmanager. It occurs wherever you execute the command line interface.

How do you execute the job? Do you start it from your machine? Can you try out to also add the respective s3 plugin there?

Best,

Arvid

On Thu, Sep 10, 2020 at 7:50 PM Dan Hill <[hidden email]> wrote:
I changed the levels to DEBUG.  I don't see useful data in the logs.

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <[hidden email]> wrote:
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Best, Jingsong Lee


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Dan
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table API and not recognizing s3 plugins

Dan
Sweet, this was the issue.  I got this to work by copying the s3 jar over to plugins for the client container.

Thanks for all of the help!  The Table API is sweet!

On Mon, Sep 14, 2020 at 11:14 PM Dan Hill <[hidden email]> wrote:
Yes, the client runs in K8.  It uses a different K8 config than the Helm chart and does not load the plugins.  Does the client use the same plugin structure as the Flink job/task manager?  I can try using it tomorrow.

Cool, that link would work too.

Thanks, Arvid!


On Mon, Sep 14, 2020 at 10:59 PM Arvid Heise <[hidden email]> wrote:
Hi Dan,

Are you running the client also in K8s? If so you need an initialization step, where you add the library to the plugins directory. Putting it into lib or into the user jar doesn't work anymore as we removed the shading in s3 in Flink 1.10.

The official Flink docker image has an easy way to add these plugins [1].


On Tue, Sep 15, 2020 at 6:40 AM Dan Hill <[hidden email]> wrote:
Thanks for the update! 

I'm trying a bunch of combinations on the client side to get the S3 Filesystem to be picked up correctly.  Most of my attempts involved building into the job jar (which I'm guessing won't work).  I then started getting issues with ClassCastExceptions.

I might try a little more tomorrow (e.g. modifying the custom image).  If I can't get it, I'll roll back to a previous Flink version that works.

Caused by: java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory

at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)

at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)

at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:431)

... 51 more



On Mon, Sep 14, 2020 at 7:03 PM Jingsong Li <[hidden email]> wrote:
Hi Dan,

I think Arvid and Dawid are right, as a workaround, you can try making S3Filesystem works in the client. But for a long term solution, we can fix it.


Best,
Jingsong 

On Mon, Sep 14, 2020 at 3:57 PM Dawid Wysakowicz <[hidden email]> wrote:

Hi Dan,

As far as I checked in the code, the FileSystemSink will try to create staging directories from the client. I think it might be problematic, as your case shows. We might need to revisit that part. I am cc'ing Jingsong who worked on the FileSystemSink.

As a workaround you might try putting the s3 plugin on the CLI classpath (not sure if plugins work for the CLI through the /plugins directory).

Best,

Dawid

On 10/09/2020 22:13, Dan Hill wrote:
This is running on my local minikube and is trying to hit minio.

On Thu, Sep 10, 2020 at 1:10 PM Dan Hill <[hidden email]> wrote:
I'm using this Helm chart.  I start the job by building an image with the job jar and using kubectl apply to do a flink run with the jar.

The log4j.properties on jobmanager and taskmanager have debug level set and are pretty embedded into the Helm chart.  My log4j-cli.properties is hacked on the CLI side.

I thought I just needed the s3 plugins in the jobmanager and taskmanager.  Do I need to have a similar plugin structure from the image where I run 'flink run'?


On Thu, Sep 10, 2020 at 1:03 PM Dan Hill <[hidden email]> wrote:
Copying more of the log

2020-09-10 19:50:17,712 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,718 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.11.1, Scala: 2.12, Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  OS current user: root

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Current Hadoop/Kerberos user: <no hadoop dependency found>

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.265-b01

2020-09-10 19:50:17,719 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Maximum heap size: 2167 MiBytes

tail: log/flink--client-flink-jobmanager-0.log: file truncated

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JAVA_HOME: /usr/local/openjdk-8

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  No Hadoop Dependency available

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  JVM Options:

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Djava.security.properties=/opt/flink/conf/security.properties

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog.file=/opt/flink/log/flink--client-flink-jobmanager-0.log

2020-09-10 19:50:17,720 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Program Arguments:

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     list

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     --jobmanager

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -     localhost:8081

2020-09-10 19:50:17,721 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Classpath: /opt/flink/lib/flink-csv-1.11.1.jar:/opt/flink/lib/flink-json-1.11.1.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.11.1.jar:/opt/flink/lib/flink-table_2.12-1.11.1.jar:/opt/flink/lib/jna-5.4.0.jar:/opt/flink/lib/jna-platform-5.4.0.jar:/opt/flink/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/lib/log4j-api-2.12.1.jar:/opt/flink/lib/log4j-core-2.12.1.jar:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/lib/oshi-core-3.4.0.jar:/opt/flink/lib/flink-dist_2.12-1.11.1.jar:::

2020-09-10 19:50:17,722 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------

2020-09-10 19:50:17,731 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.numberOfTaskSlots, 2

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: blob.server.port, 6124

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.rpc.port, 6122

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.heap.size, 1g

2020-09-10 19:50:17,732 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 1g

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend, rocksdb

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.dir, file:///flink_state/checkpoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.savepoints.dir, file:///flink_state/savepoints

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.async, true

2020-09-10 19:50:17,733 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.memory-threshold, 1024

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.fs.write-buffer-size, 4096

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.incremental, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.local-recovery, true

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.checkpoints.num-retained, 1

2020-09-10 19:50:17,734 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.state.local.root-dirs, file:///flink_state/local-recovery

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.checkpoint.transfer.thread.num, 1

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.localdir, /flink_state/rocksdb

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.options-factory, org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.predefined-options, DEFAULT

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.timer-service.factory, HEAP

2020-09-10 19:50:17,735 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.backend.rocksdb.ttl.compaction.filter.enabled, false

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, flink-jobmanager

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.jvm-metaspace.size, 256mb

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.endpoint, http://minio:9000

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.path.style.access, true

2020-09-10 19:50:17,736 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.access-key, YOURACCESSKEY

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.secret-key, ******

2020-09-10 19:50:17,737 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3a.aws.credentials.provider, org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

2020-09-10 19:50:17,802 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Loading FallbackYarnSessionCli

2020-09-10 19:50:17,929 INFO  org.apache.flink.core.fs.FileSystem                          [] - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.

2020-09-10 19:50:18,102 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory [] - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,126 INFO  org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file will be created as /tmp/jaas-1506212733867615019.conf.

2020-09-10 19:50:18,161 INFO  org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory [] - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.

2020-09-10 19:50:18,163 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Running 'list' command.

2020-09-10 19:50:18,226 INFO  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies.

2020-09-10 19:50:19,107 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Waiting for response...

2020-09-10 19:50:19,414 INFO  org.apache.flink.client.cli.CliFrontend                      [] - Successfully retrieved list of jobs



On Thu, Sep 10, 2020 at 1:02 PM Arvid Heise <[hidden email]> wrote:
Hi Dan,

somehow enabling debug statements did not work.

However, the logs helps to narrow down the issue. The exception occurs neither on jobmanager nor on taskmanager. It occurs wherever you execute the command line interface.

How do you execute the job? Do you start it from your machine? Can you try out to also add the respective s3 plugin there?

Best,

Arvid

On Thu, Sep 10, 2020 at 7:50 PM Dan Hill <[hidden email]> wrote:
I changed the levels to DEBUG.  I don't see useful data in the logs.

On Thu, Sep 10, 2020 at 8:45 AM Arvid Heise <[hidden email]> wrote:
Could you try 1) or 2) and enable debug logging* and share the log with us?

*Usually by adjusting FLINK_HOME/conf/log4j.properties.

On Thu, Sep 10, 2020 at 5:38 PM Dan Hill <[hidden email]> wrote:
Ah, sorry, it's a copy/paste issue with this email.  I've tried both:
1) using s3a uri with flink-s3-fs-hadoop jar in /opt/flink/plugins/s3-fs-hadoop.
2) using s3p uri with flink-s3-fs-presto jar in /opt/flink/plugins/s3-fs-presto.
3) loading both 1 and 2
4) trying s3 uri.

When doing 1)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


When doing 2)

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.


etc

On Thu, Sep 10, 2020 at 8:15 AM Arvid Heise <[hidden email]> wrote:
Hi Dan,

s3p is only provided by flink-s3-fs-presto plugin. The plugin you used provides s3a.
(and both provide s3, but it's good to use the more specific prefix).

Best,

Arvid

On Thu, Sep 10, 2020 at 9:24 AM Dan Hill <[hidden email]> wrote:
Background
I'm converting some prototype Flink v1.11.1 code that uses DataSet/DataTable APIs to use the Table API.

Problem
When switching to using the Table API, my s3 plugins stopped working.  I don't know why.  I've added the required maven table dependencies to the job.

I've tried us moving both the presto and/or the hadoop s3 jars to plugin subfolders.  No luck.   

Any ideas what is wrong?  I'm guessing I'm missing something simple.


Error

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin: flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.

at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:473)

at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)

at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)

at org.apache.flink.table.filesystem.FileSystemTableSink.toStagingPath(FileSystemTableSink.java:232)

... 35 more


ls of plugins directory (same for taskmanager)

kubectl exec pod/flink-jobmanager-0  -- ls -l /opt/flink/plugins/s3-fs-hadoop

total 19520

-rw-r--r-- 1 root root 19985452 Sep 10 06:27 flink-s3-fs-hadoop-1.11.1.jar





--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--
Best, Jingsong Lee


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng