Got “pyflink.util.exceptions.TableException: findAndCreateTableSource failed.” when running PyFlink example

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Got “pyflink.util.exceptions.TableException: findAndCreateTableSource failed.” when running PyFlink example

Yik San Chan
(The question is cross-posted on StackOverflow https://stackoverflow.com/questions/66632765/got-pyflink-util-exceptions-tableexception-findandcreatetablesource-failed-w)

I am running below PyFlink program (copied from https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html)

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

To verify it works, I did the following in order:

1. Run `echo -e  "flink\npyflink\nflink" > /tmp/input`
1. Run `python WordCount.py`
1. Run `cat /tmp/out` and find expected output

Then I changed my PyFlink program a bit to prefer SQL over Table API, but I find it doesn't work.

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
"""

t_env.sql_update(my_source_ddl)
t_env.sql_update(my_sink_ddl)

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

Here's the error:

```
Traceback (most recent call last):
  File "WordCount.py", line 38, in <module>
    .execute_insert('mySink').wait()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table.py", line 864, in execute_insert
    return TableResult(self._j_table.executeInsert(table_path, overwrite))
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 162, in deco
    raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSink failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:87)
at org.apache.flink.table.api.internal.TableEnvImpl.getTableSink(TableEnvImpl.scala:1097)
at org.apache.flink.table.api.internal.TableEnvImpl.org$apache$flink$table$api$internal$TableEnvImpl$$writeToSinkAndTranslate(TableEnvImpl.scala:929)
at org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:556)
at org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:554)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.api.internal.TableEnvImpl.executeInternal(TableEnvImpl.scala:554)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```

I wonder what's wrong with my new program? Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Got “pyflink.util.exceptions.TableException: findAndCreateTableSource failed.” when running PyFlink example

Xingbo Huang
Hi,

The problem is that the legacy DataSet you are using does not support the FileSystem connector you declared. You can use blink Planner to achieve your needs.

>>>
    t_env = BatchTableEnvironment.create(
        environment_settings=EnvironmentSettings.new_instance()
        .in_batch_mode().use_blink_planner().build())
    t_env._j_tenv.getPlanner().getExecEnv().setParallelism(1)

    my_source_ddl = """
        create table mySource (
            word VARCHAR
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = '/tmp/input'
        )
    """

    my_sink_ddl = """
        create table mySink (
            word VARCHAR,
            `count` BIGINT
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = '/tmp/output'
        )
    """

    t_env.execute_sql(my_source_ddl)
    t_env.execute_sql(my_sink_ddl)

    tab = t_env.from_path('mySource')
    tab.group_by(tab.word) \
        .select(tab.word, lit(1).count) \
        .execute_insert('mySink').wait()
>>>

Best,
Xingbo

Yik San Chan <[hidden email]> 于2021年3月15日周一 下午1:26写道:
(The question is cross-posted on StackOverflow https://stackoverflow.com/questions/66632765/got-pyflink-util-exceptions-tableexception-findandcreatetablesource-failed-w)

I am running below PyFlink program (copied from https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html)

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

To verify it works, I did the following in order:

1. Run `echo -e  "flink\npyflink\nflink" > /tmp/input`
1. Run `python WordCount.py`
1. Run `cat /tmp/out` and find expected output

Then I changed my PyFlink program a bit to prefer SQL over Table API, but I find it doesn't work.

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
"""

t_env.sql_update(my_source_ddl)
t_env.sql_update(my_sink_ddl)

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

Here's the error:

```
Traceback (most recent call last):
  File "WordCount.py", line 38, in <module>
    .execute_insert('mySink').wait()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table.py", line 864, in execute_insert
    return TableResult(self._j_table.executeInsert(table_path, overwrite))
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 162, in deco
    raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSink failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:87)
at org.apache.flink.table.api.internal.TableEnvImpl.getTableSink(TableEnvImpl.scala:1097)
at org.apache.flink.table.api.internal.TableEnvImpl.org$apache$flink$table$api$internal$TableEnvImpl$$writeToSinkAndTranslate(TableEnvImpl.scala:929)
at org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:556)
at org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:554)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.api.internal.TableEnvImpl.executeInternal(TableEnvImpl.scala:554)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```

I wonder what's wrong with my new program? Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Got “pyflink.util.exceptions.TableException: findAndCreateTableSource failed.” when running PyFlink example

Yik San Chan
Thanks for your help, it works.

Best,
Yik San Chan

On Tue, Mar 16, 2021 at 10:03 AM Xingbo Huang <[hidden email]> wrote:
Hi,

The problem is that the legacy DataSet you are using does not support the FileSystem connector you declared. You can use blink Planner to achieve your needs.

>>>
    t_env = BatchTableEnvironment.create(
        environment_settings=EnvironmentSettings.new_instance()
        .in_batch_mode().use_blink_planner().build())
    t_env._j_tenv.getPlanner().getExecEnv().setParallelism(1)

    my_source_ddl = """
        create table mySource (
            word VARCHAR
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = '/tmp/input'
        )
    """

    my_sink_ddl = """
        create table mySink (
            word VARCHAR,
            `count` BIGINT
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = '/tmp/output'
        )
    """

    t_env.execute_sql(my_source_ddl)
    t_env.execute_sql(my_sink_ddl)

    tab = t_env.from_path('mySource')
    tab.group_by(tab.word) \
        .select(tab.word, lit(1).count) \
        .execute_insert('mySink').wait()
>>>

Best,
Xingbo

Yik San Chan <[hidden email]> 于2021年3月15日周一 下午1:26写道:
(The question is cross-posted on StackOverflow https://stackoverflow.com/questions/66632765/got-pyflink-util-exceptions-tableexception-findandcreatetablesource-failed-w)

I am running below PyFlink program (copied from https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html)

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

To verify it works, I did the following in order:

1. Run `echo -e  "flink\npyflink\nflink" > /tmp/input`
1. Run `python WordCount.py`
1. Run `cat /tmp/out` and find expected output

Then I changed my PyFlink program a bit to prefer SQL over Table API, but I find it doesn't work.

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
"""

t_env.sql_update(my_source_ddl)
t_env.sql_update(my_sink_ddl)

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

Here's the error:

```
Traceback (most recent call last):
  File "WordCount.py", line 38, in <module>
    .execute_insert('mySink').wait()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table.py", line 864, in execute_insert
    return TableResult(self._j_table.executeInsert(table_path, overwrite))
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 162, in deco
    raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSink failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:87)
at org.apache.flink.table.api.internal.TableEnvImpl.getTableSink(TableEnvImpl.scala:1097)
at org.apache.flink.table.api.internal.TableEnvImpl.org$apache$flink$table$api$internal$TableEnvImpl$$writeToSinkAndTranslate(TableEnvImpl.scala:929)
at org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:556)
at org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:554)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.api.internal.TableEnvImpl.executeInternal(TableEnvImpl.scala:554)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```

I wonder what's wrong with my new program? Thanks!