Filesystem as a stream source in Table/SQL API

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Filesystem as a stream source in Table/SQL API

Kai Fu

Hi,

I'm facing a situation where I want the Flink App to dynamically detect the change of the Filesystem batch data source. As I tried in the following example in sql-client.sh, it can query all the records under the folder for the select.

While I'm adding a new file to the folder, the query does not refresh and it seems it cannot detect the new file. It can only reflect records in the new file unless I cancel the current query and do select again. Is it possible to make the App detect such file changes automatically as the one in the stream source?

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  hh STRING,
)  WITH (
  'connector'='filesystem',
  'path'='file:///path/folder/',
  'format'='csv'
);

select * from fs_table;

-- Best wishes
Kai

Reply | Threaded
Open this post in threaded view
|

Re: Filesystem as a stream source in Table/SQL API

Xingbo Huang
Hi Kai,

I took a look at the implementation of the filesystem connector. It will decide which files to read at startup
and won't change during running. If you want to need this function, you may need to customize a new connector.

Best,
Xingbo

eef hhj <[hidden email]> 于2020年11月21日周六 下午2:38写道:

Hi,

I'm facing a situation where I want the Flink App to dynamically detect the change of the Filesystem batch data source. As I tried in the following example in sql-client.sh, it can query all the records under the folder for the select.

While I'm adding a new file to the folder, the query does not refresh and it seems it cannot detect the new file. It can only reflect records in the new file unless I cancel the current query and do select again. Is it possible to make the App detect such file changes automatically as the one in the stream source?

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  hh STRING,
)  WITH (
  'connector'='filesystem',
  'path'='file:///path/folder/',
  'format'='csv'
);

select * from fs_table;

-- Best wishes
Kai

Reply | Threaded
Open this post in threaded view
|

Re: Filesystem as a stream source in Table/SQL API

Jark Wu-3
Hi Kai,

Streaming filesystem source is not supported yet in TableAPI/SQL. 
This is on the roadmap and there are some problems that need to be fixed. 
As a workaround, you can use Hive connector to reading files continuously on
filesystems [1].

Best,
Jark





On Mon, 23 Nov 2020 at 10:21, Xingbo Huang <[hidden email]> wrote:
Hi Kai,

I took a look at the implementation of the filesystem connector. It will decide which files to read at startup
and won't change during running. If you want to need this function, you may need to customize a new connector.

Best,
Xingbo

eef hhj <[hidden email]> 于2020年11月21日周六 下午2:38写道:

Hi,

I'm facing a situation where I want the Flink App to dynamically detect the change of the Filesystem batch data source. As I tried in the following example in sql-client.sh, it can query all the records under the folder for the select.

While I'm adding a new file to the folder, the query does not refresh and it seems it cannot detect the new file. It can only reflect records in the new file unless I cancel the current query and do select again. Is it possible to make the App detect such file changes automatically as the one in the stream source?

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  hh STRING,
)  WITH (
  'connector'='filesystem',
  'path'='file:///path/folder/',
  'format'='csv'
);

select * from fs_table;

-- Best wishes
Kai

Reply | Threaded
Open this post in threaded view
|

Re: Filesystem as a stream source in Table/SQL API

Kai Fu
Hi Jark,

Thank you for your helpful and quick response, I will try the hive connector solution. It will be great if you can point out the link tracking the filesystem streaming data source. 

On Mon, Nov 23, 2020 at 10:33 AM Jark Wu <[hidden email]> wrote:
Hi Kai,

Streaming filesystem source is not supported yet in TableAPI/SQL. 
This is on the roadmap and there are some problems that need to be fixed. 
As a workaround, you can use Hive connector to reading files continuously on
filesystems [1].

Best,
Jark





On Mon, 23 Nov 2020 at 10:21, Xingbo Huang <[hidden email]> wrote:
Hi Kai,

I took a look at the implementation of the filesystem connector. It will decide which files to read at startup
and won't change during running. If you want to need this function, you may need to customize a new connector.

Best,
Xingbo

eef hhj <[hidden email]> 于2020年11月21日周六 下午2:38写道:

Hi,

I'm facing a situation where I want the Flink App to dynamically detect the change of the Filesystem batch data source. As I tried in the following example in sql-client.sh, it can query all the records under the folder for the select.

While I'm adding a new file to the folder, the query does not refresh and it seems it cannot detect the new file. It can only reflect records in the new file unless I cancel the current query and do select again. Is it possible to make the App detect such file changes automatically as the one in the stream source?

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  hh STRING,
)  WITH (
  'connector'='filesystem',
  'path'='file:///path/folder/',
  'format'='csv'
);

select * from fs_table;

-- Best wishes
Kai



--
Best regards,
Fu Kai(扶凯)
Reply | Threaded
Open this post in threaded view
|

Re: Filesystem as a stream source in Table/SQL API

Jark Wu-3

On Mon, 23 Nov 2020 at 12:19, eef hhj <[hidden email]> wrote:
Hi Jark,

Thank you for your helpful and quick response, I will try the hive connector solution. It will be great if you can point out the link tracking the filesystem streaming data source. 

On Mon, Nov 23, 2020 at 10:33 AM Jark Wu <[hidden email]> wrote:
Hi Kai,

Streaming filesystem source is not supported yet in TableAPI/SQL. 
This is on the roadmap and there are some problems that need to be fixed. 
As a workaround, you can use Hive connector to reading files continuously on
filesystems [1].

Best,
Jark





On Mon, 23 Nov 2020 at 10:21, Xingbo Huang <[hidden email]> wrote:
Hi Kai,

I took a look at the implementation of the filesystem connector. It will decide which files to read at startup
and won't change during running. If you want to need this function, you may need to customize a new connector.

Best,
Xingbo

eef hhj <[hidden email]> 于2020年11月21日周六 下午2:38写道:

Hi,

I'm facing a situation where I want the Flink App to dynamically detect the change of the Filesystem batch data source. As I tried in the following example in sql-client.sh, it can query all the records under the folder for the select.

While I'm adding a new file to the folder, the query does not refresh and it seems it cannot detect the new file. It can only reflect records in the new file unless I cancel the current query and do select again. Is it possible to make the App detect such file changes automatically as the one in the stream source?

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  hh STRING,
)  WITH (
  'connector'='filesystem',
  'path'='file:///path/folder/',
  'format'='csv'
);

select * from fs_table;

-- Best wishes
Kai



--
Best regards,
Fu Kai(扶凯)