PyFlink java.io.EOFException at java.io.DataInputStream.readInt

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink java.io.EOFException at java.io.DataInputStream.readInt

Yik San Chan
This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint

I have a PyFlink job that reads from Kafka source, transform, and write to Kafka sink. This is a `tree` view of my working directory.

```
> tree
.
├── deps
│   └── flink-sql-connector-kafka_2.12-1.12.0.jar
├── flink_run.sh
├── main.py
├── pyflink1.12.0.zip
└── tasks
    └── user_last_n_clicks
        ├── sink_ddl.sql
        ├── source_ddl.sql
        └── transform_dml.sql
```

This is the `flink_run.sh`:

```
flink run \
--yarnname test-pyflink \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py main.py testing user_last_n_clicks
```

This is the `main.py`. The key logic is in:
- `parse_content` udf.
- load sql files from tasks subfolder, and execute_sql

```python
import os
from sys import argv
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf

def read_file_content(filepath):
    with open(filepath) as f:
        return f.read()

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse_content(content_str):
    import json
    res = {}
    content = json.loads(content_str)
    if 'postId' in content:
        res['item_id'] = content['postId']
    if 'lid' in content:
        res['item_id'] = content['lid']
    if 'param' in content and 'tag' in content['param']:
        res['tag'] = content['param']['tag']
    return res

CWD = os.getcwd()
_, palfish_env, task = argv

VALID_PALFISH_ENVS = ['development', 'testing', 'production']
if palfish_env not in VALID_PALFISH_ENVS:
    raise Exception(f"{palfish_env} is not a valid env, should be one of [{', '.join(VALID_PALFISH_ENVS)}].")

VALID_TASKS = os.listdir(f"{CWD}/tasks")
if task not in VALID_TASKS:
    raise Exception(f"{task} is not a valid task, should be one of [{', '.join(VALID_TASKS)}].")

config = {
    "development": {
        "${generation.kafka.source.servers}": "localhost:9094",
        "${generation.kafka.sink.servers}": "localhost:9094"
    },
    "testing": {
        "${generation.kafka.source.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092",
        "${generation.kafka.sink.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092"
    },
    "production": {
        "${generation.kafka.source.servers}": "10.111.203.9:9092,10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,10.111.204.164:9092,10.111.204.165:9092",
        "${generation.kafka.sink.servers}": "10.111.209.219:9092,10.111.209.220:9092,10.111.209.221:9092"
    }
}

FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"

source_ddl = read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', config[palfish_env]['${generation.kafka.source.servers}'])
sink_ddl = read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', config[palfish_env]['${generation.kafka.sink.servers}'])
transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')

exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=exec_env, environment_settings=env_settings)

t_env.get_config().get_configuration().set_string("pipeline.jars", f"file://{FAT_JAR_PATH}")
t_env.create_temporary_function("ParseContent", parse_content)

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql(transform_dml).wait()
```

See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.

```sql
# source_ddl.sql
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
'properties.group.id' = 'flink-featurepipelines',
'format' = 'json'
)

# transform_ddl.sql
INSERT INTO kafka_sink
WITH t1 AS (
SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) content, body['log']['serverts'] server_ts
FROM kafka_source
),
t2 AS (
SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
FROM t1
WHERE content['item_id'] IS NOT NULL
AND content['tag'] = '点击帖子卡片'
),
last_n AS (
SELECT user_id, item_id, server_ts
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
FROM t2)
WHERE row_num <= 5
)
SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, LISTAGG(CAST(item_id AS STRING)) last_5_clicks
FROM last_n
GROUP BY user_id

# sink_ddl.sql
CREATE TABLE kafka_sink (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
    'key.format' = 'json',
    'value.format' = 'json'
)
```

I got the error when running the PyFlink program in my testing environment machine.

```
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
```

Here are the full logs, see https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.

Any idea why the exception? Thanks.

Yik San
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink java.io.EOFException at java.io.DataInputStream.readInt

Dian Fu
Hi,

What’s the Flink version in the cluster nodes? It should matches the PyFlink version.

Regards,
Dian

2021年3月18日 下午5:01,Yik San Chan <[hidden email]> 写道:

This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint

I have a PyFlink job that reads from Kafka source, transform, and write to Kafka sink. This is a `tree` view of my working directory.

```
> tree
.
├── deps
│   └── flink-sql-connector-kafka_2.12-1.12.0.jar
├── flink_run.sh
├── main.py
├── pyflink1.12.0.zip
└── tasks
    └── user_last_n_clicks
        ├── sink_ddl.sql
        ├── source_ddl.sql
        └── transform_dml.sql
```

This is the `flink_run.sh`:

```
flink run \
--yarnname test-pyflink \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py main.py testing user_last_n_clicks
```

This is the `main.py`. The key logic is in:
- `parse_content` udf.
- load sql files from tasks subfolder, and execute_sql

```python
import os
from sys import argv
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf

def read_file_content(filepath):
    with open(filepath) as f:
        return f.read()

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse_content(content_str):
    import json
    res = {}
    content = json.loads(content_str)
    if 'postId' in content:
        res['item_id'] = content['postId']
    if 'lid' in content:
        res['item_id'] = content['lid']
    if 'param' in content and 'tag' in content['param']:
        res['tag'] = content['param']['tag']
    return res

CWD = os.getcwd()
_, palfish_env, task = argv

VALID_PALFISH_ENVS = ['development', 'testing', 'production']
if palfish_env not in VALID_PALFISH_ENVS:
    raise Exception(f"{palfish_env} is not a valid env, should be one of [{', '.join(VALID_PALFISH_ENVS)}].")

VALID_TASKS = os.listdir(f"{CWD}/tasks")
if task not in VALID_TASKS:
    raise Exception(f"{task} is not a valid task, should be one of [{', '.join(VALID_TASKS)}].")

config = {
    "development": {
        "${generation.kafka.source.servers}": "localhost:9094",
        "${generation.kafka.sink.servers}": "localhost:9094"
    },
    "testing": {
        "${generation.kafka.source.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092",
        "${generation.kafka.sink.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092"
    },
    "production": {
        "${generation.kafka.source.servers}": "10.111.203.9:9092,10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,10.111.204.164:9092,10.111.204.165:9092",
        "${generation.kafka.sink.servers}": "10.111.209.219:9092,10.111.209.220:9092,10.111.209.221:9092"
    }
}

FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"

source_ddl = read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', config[palfish_env]['${generation.kafka.source.servers}'])
sink_ddl = read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', config[palfish_env]['${generation.kafka.sink.servers}'])
transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')

exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=exec_env, environment_settings=env_settings)

t_env.get_config().get_configuration().set_string("pipeline.jars", f"file://{FAT_JAR_PATH}")
t_env.create_temporary_function("ParseContent", parse_content)

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql(transform_dml).wait()
```

See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.

```sql
# source_ddl.sql
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
'properties.group.id' = 'flink-featurepipelines',
'format' = 'json'
)

# transform_ddl.sql
INSERT INTO kafka_sink
WITH t1 AS (
SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) content, body['log']['serverts'] server_ts
FROM kafka_source
),
t2 AS (
SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
FROM t1
WHERE content['item_id'] IS NOT NULL
AND content['tag'] = '点击帖子卡片'
),
last_n AS (
SELECT user_id, item_id, server_ts
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
FROM t2)
WHERE row_num <= 5
)
SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, LISTAGG(CAST(item_id AS STRING)) last_5_clicks
FROM last_n
GROUP BY user_id

# sink_ddl.sql
CREATE TABLE kafka_sink (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
    'key.format' = 'json',
    'value.format' = 'json'
)
```

I got the error when running the PyFlink program in my testing environment machine.

```
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
```

Here are the full logs, see https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.

Any idea why the exception? Thanks.

Yik San

Reply | Threaded
Open this post in threaded view
|

Re: PyFlink java.io.EOFException at java.io.DataInputStream.readInt

Yik San Chan
Hi Dian,

The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is also 1.12.0

$ which flink
/data/apache/flink/flink-1.12.0/bin/flink

Best,
Yik San

On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <[hidden email]> wrote:
Hi,

What’s the Flink version in the cluster nodes? It should matches the PyFlink version.

Regards,
Dian

2021年3月18日 下午5:01,Yik San Chan <[hidden email]> 写道:

This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint

I have a PyFlink job that reads from Kafka source, transform, and write to Kafka sink. This is a `tree` view of my working directory.

```
> tree
.
├── deps
│   └── flink-sql-connector-kafka_2.12-1.12.0.jar
├── flink_run.sh
├── main.py
├── pyflink1.12.0.zip
└── tasks
    └── user_last_n_clicks
        ├── sink_ddl.sql
        ├── source_ddl.sql
        └── transform_dml.sql
```

This is the `flink_run.sh`:

```
flink run \
--yarnname test-pyflink \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py main.py testing user_last_n_clicks
```

This is the `main.py`. The key logic is in:
- `parse_content` udf.
- load sql files from tasks subfolder, and execute_sql

```python
import os
from sys import argv
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf

def read_file_content(filepath):
    with open(filepath) as f:
        return f.read()

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse_content(content_str):
    import json
    res = {}
    content = json.loads(content_str)
    if 'postId' in content:
        res['item_id'] = content['postId']
    if 'lid' in content:
        res['item_id'] = content['lid']
    if 'param' in content and 'tag' in content['param']:
        res['tag'] = content['param']['tag']
    return res

CWD = os.getcwd()
_, palfish_env, task = argv

VALID_PALFISH_ENVS = ['development', 'testing', 'production']
if palfish_env not in VALID_PALFISH_ENVS:
    raise Exception(f"{palfish_env} is not a valid env, should be one of [{', '.join(VALID_PALFISH_ENVS)}].")

VALID_TASKS = os.listdir(f"{CWD}/tasks")
if task not in VALID_TASKS:
    raise Exception(f"{task} is not a valid task, should be one of [{', '.join(VALID_TASKS)}].")

config = {
    "development": {
        "${generation.kafka.source.servers}": "localhost:9094",
        "${generation.kafka.sink.servers}": "localhost:9094"
    },
    "testing": {
        "${generation.kafka.source.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092",
        "${generation.kafka.sink.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092"
    },
    "production": {
        "${generation.kafka.source.servers}": "10.111.203.9:9092,10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,10.111.204.164:9092,10.111.204.165:9092",
        "${generation.kafka.sink.servers}": "10.111.209.219:9092,10.111.209.220:9092,10.111.209.221:9092"
    }
}

FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"

source_ddl = read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', config[palfish_env]['${generation.kafka.source.servers}'])
sink_ddl = read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', config[palfish_env]['${generation.kafka.sink.servers}'])
transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')

exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=exec_env, environment_settings=env_settings)

t_env.get_config().get_configuration().set_string("pipeline.jars", f"file://{FAT_JAR_PATH}")
t_env.create_temporary_function("ParseContent", parse_content)

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql(transform_dml).wait()
```

See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.

```sql
# source_ddl.sql
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
'properties.group.id' = 'flink-featurepipelines',
'format' = 'json'
)

# transform_ddl.sql
INSERT INTO kafka_sink
WITH t1 AS (
SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) content, body['log']['serverts'] server_ts
FROM kafka_source
),
t2 AS (
SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
FROM t1
WHERE content['item_id'] IS NOT NULL
AND content['tag'] = '点击帖子卡片'
),
last_n AS (
SELECT user_id, item_id, server_ts
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
FROM t2)
WHERE row_num <= 5
)
SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, LISTAGG(CAST(item_id AS STRING)) last_5_clicks
FROM last_n
GROUP BY user_id

# sink_ddl.sql
CREATE TABLE kafka_sink (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
    'key.format' = 'json',
    'value.format' = 'json'
)
```

I got the error when running the PyFlink program in my testing environment machine.

```
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
```

Here are the full logs, see https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.

Any idea why the exception? Thanks.

Yik San

Reply | Threaded
Open this post in threaded view
|

Re: PyFlink java.io.EOFException at java.io.DataInputStream.readInt

Dian Fu

I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does the jar files in the cluster nodes are also built with Scala 2.12? PyFlink package bundles jar files with Scala 2.11 by default. I’m still not sure if it’s related to this issue. However, I think this is problematic. Could you make sure that they are consistent?


2021年3月19日 上午10:40,Yik San Chan <[hidden email]> 写道:

Hi Dian,

The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is also 1.12.0

$ which flink
/data/apache/flink/flink-1.12.0/bin/flink

Best,
Yik San

On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <[hidden email]> wrote:
Hi,

What’s the Flink version in the cluster nodes? It should matches the PyFlink version.

Regards,
Dian

2021年3月18日 下午5:01,Yik San Chan <[hidden email]> 写道:

This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint

I have a PyFlink job that reads from Kafka source, transform, and write to Kafka sink. This is a `tree` view of my working directory.

```
> tree
.
├── deps
│   └── flink-sql-connector-kafka_2.12-1.12.0.jar
├── flink_run.sh
├── main.py
├── pyflink1.12.0.zip
└── tasks
    └── user_last_n_clicks
        ├── sink_ddl.sql
        ├── source_ddl.sql
        └── transform_dml.sql
```

This is the `flink_run.sh`:

```
flink run \
--yarnname test-pyflink \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py main.py testing user_last_n_clicks
```

This is the `main.py`. The key logic is in:
- `parse_content` udf.
- load sql files from tasks subfolder, and execute_sql

```python
import os
from sys import argv
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf

def read_file_content(filepath):
    with open(filepath) as f:
        return f.read()

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse_content(content_str):
    import json
    res = {}
    content = json.loads(content_str)
    if 'postId' in content:
        res['item_id'] = content['postId']
    if 'lid' in content:
        res['item_id'] = content['lid']
    if 'param' in content and 'tag' in content['param']:
        res['tag'] = content['param']['tag']
    return res

CWD = os.getcwd()
_, palfish_env, task = argv

VALID_PALFISH_ENVS = ['development', 'testing', 'production']
if palfish_env not in VALID_PALFISH_ENVS:
    raise Exception(f"{palfish_env} is not a valid env, should be one of [{', '.join(VALID_PALFISH_ENVS)}].")

VALID_TASKS = os.listdir(f"{CWD}/tasks")
if task not in VALID_TASKS:
    raise Exception(f"{task} is not a valid task, should be one of [{', '.join(VALID_TASKS)}].")

config = {
    "development": {
        "${generation.kafka.source.servers}": "localhost:9094",
        "${generation.kafka.sink.servers}": "localhost:9094"
    },
    "testing": {
        "${generation.kafka.source.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092",
        "${generation.kafka.sink.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092"
    },
    "production": {
        "${generation.kafka.source.servers}": "10.111.203.9:9092,10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,10.111.204.164:9092,10.111.204.165:9092",
        "${generation.kafka.sink.servers}": "10.111.209.219:9092,10.111.209.220:9092,10.111.209.221:9092"
    }
}

FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"

source_ddl = read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', config[palfish_env]['${generation.kafka.source.servers}'])
sink_ddl = read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', config[palfish_env]['${generation.kafka.sink.servers}'])
transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')

exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=exec_env, environment_settings=env_settings)

t_env.get_config().get_configuration().set_string("pipeline.jars", f"file://{FAT_JAR_PATH}")
t_env.create_temporary_function("ParseContent", parse_content)

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql(transform_dml).wait()
```

See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.

```sql
# source_ddl.sql
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
'properties.group.id' = 'flink-featurepipelines',
'format' = 'json'
)

# transform_ddl.sql
INSERT INTO kafka_sink
WITH t1 AS (
SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) content, body['log']['serverts'] server_ts
FROM kafka_source
),
t2 AS (
SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
FROM t1
WHERE content['item_id'] IS NOT NULL
AND content['tag'] = '点击帖子卡片'
),
last_n AS (
SELECT user_id, item_id, server_ts
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
FROM t2)
WHERE row_num <= 5
)
SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, LISTAGG(CAST(item_id AS STRING)) last_5_clicks
FROM last_n
GROUP BY user_id

# sink_ddl.sql
CREATE TABLE kafka_sink (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
    'key.format' = 'json',
    'value.format' = 'json'
)
```

I got the error when running the PyFlink program in my testing environment machine.

```
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
```

Here are the full logs, see https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.

Any idea why the exception? Thanks.

Yik San


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink java.io.EOFException at java.io.DataInputStream.readInt

Yik San Chan
Hi Dian,

It is a good catch, though after changing to use flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same error.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <[hidden email]> wrote:

I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does the jar files in the cluster nodes are also built with Scala 2.12? PyFlink package bundles jar files with Scala 2.11 by default. I’m still not sure if it’s related to this issue. However, I think this is problematic. Could you make sure that they are consistent?


2021年3月19日 上午10:40,Yik San Chan <[hidden email]> 写道:

Hi Dian,

The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is also 1.12.0

$ which flink
/data/apache/flink/flink-1.12.0/bin/flink

Best,
Yik San

On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <[hidden email]> wrote:
Hi,

What’s the Flink version in the cluster nodes? It should matches the PyFlink version.

Regards,
Dian

2021年3月18日 下午5:01,Yik San Chan <[hidden email]> 写道:

This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint

I have a PyFlink job that reads from Kafka source, transform, and write to Kafka sink. This is a `tree` view of my working directory.

```
> tree
.
├── deps
│   └── flink-sql-connector-kafka_2.12-1.12.0.jar
├── flink_run.sh
├── main.py
├── pyflink1.12.0.zip
└── tasks
    └── user_last_n_clicks
        ├── sink_ddl.sql
        ├── source_ddl.sql
        └── transform_dml.sql
```

This is the `flink_run.sh`:

```
flink run \
--yarnname test-pyflink \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py main.py testing user_last_n_clicks
```

This is the `main.py`. The key logic is in:
- `parse_content` udf.
- load sql files from tasks subfolder, and execute_sql

```python
import os
from sys import argv
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf

def read_file_content(filepath):
    with open(filepath) as f:
        return f.read()

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse_content(content_str):
    import json
    res = {}
    content = json.loads(content_str)
    if 'postId' in content:
        res['item_id'] = content['postId']
    if 'lid' in content:
        res['item_id'] = content['lid']
    if 'param' in content and 'tag' in content['param']:
        res['tag'] = content['param']['tag']
    return res

CWD = os.getcwd()
_, palfish_env, task = argv

VALID_PALFISH_ENVS = ['development', 'testing', 'production']
if palfish_env not in VALID_PALFISH_ENVS:
    raise Exception(f"{palfish_env} is not a valid env, should be one of [{', '.join(VALID_PALFISH_ENVS)}].")

VALID_TASKS = os.listdir(f"{CWD}/tasks")
if task not in VALID_TASKS:
    raise Exception(f"{task} is not a valid task, should be one of [{', '.join(VALID_TASKS)}].")

config = {
    "development": {
        "${generation.kafka.source.servers}": "localhost:9094",
        "${generation.kafka.sink.servers}": "localhost:9094"
    },
    "testing": {
        "${generation.kafka.source.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092",
        "${generation.kafka.sink.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092"
    },
    "production": {
        "${generation.kafka.source.servers}": "10.111.203.9:9092,10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,10.111.204.164:9092,10.111.204.165:9092",
        "${generation.kafka.sink.servers}": "10.111.209.219:9092,10.111.209.220:9092,10.111.209.221:9092"
    }
}

FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"

source_ddl = read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', config[palfish_env]['${generation.kafka.source.servers}'])
sink_ddl = read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', config[palfish_env]['${generation.kafka.sink.servers}'])
transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')

exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=exec_env, environment_settings=env_settings)

t_env.get_config().get_configuration().set_string("pipeline.jars", f"file://{FAT_JAR_PATH}")
t_env.create_temporary_function("ParseContent", parse_content)

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql(transform_dml).wait()
```

See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.

```sql
# source_ddl.sql
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
'properties.group.id' = 'flink-featurepipelines',
'format' = 'json'
)

# transform_ddl.sql
INSERT INTO kafka_sink
WITH t1 AS (
SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) content, body['log']['serverts'] server_ts
FROM kafka_source
),
t2 AS (
SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
FROM t1
WHERE content['item_id'] IS NOT NULL
AND content['tag'] = '点击帖子卡片'
),
last_n AS (
SELECT user_id, item_id, server_ts
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
FROM t2)
WHERE row_num <= 5
)
SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, LISTAGG(CAST(item_id AS STRING)) last_5_clicks
FROM last_n
GROUP BY user_id

# sink_ddl.sql
CREATE TABLE kafka_sink (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
    'key.format' = 'json',
    'value.format' = 'json'
)
```

I got the error when running the PyFlink program in my testing environment machine.

```
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
```

Here are the full logs, see https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.

Any idea why the exception? Thanks.

Yik San


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink java.io.EOFException at java.io.DataInputStream.readInt

Yik San Chan
Hi Dian,

I am able to reproduce this issue in a much simpler setup. Let me update with the simpler reproducible example shortly.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <[hidden email]> wrote:
Hi Dian,

It is a good catch, though after changing to use flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same error.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <[hidden email]> wrote:

I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does the jar files in the cluster nodes are also built with Scala 2.12? PyFlink package bundles jar files with Scala 2.11 by default. I’m still not sure if it’s related to this issue. However, I think this is problematic. Could you make sure that they are consistent?


2021年3月19日 上午10:40,Yik San Chan <[hidden email]> 写道:

Hi Dian,

The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is also 1.12.0

$ which flink
/data/apache/flink/flink-1.12.0/bin/flink

Best,
Yik San

On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <[hidden email]> wrote:
Hi,

What’s the Flink version in the cluster nodes? It should matches the PyFlink version.

Regards,
Dian

2021年3月18日 下午5:01,Yik San Chan <[hidden email]> 写道:

This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint

I have a PyFlink job that reads from Kafka source, transform, and write to Kafka sink. This is a `tree` view of my working directory.

```
> tree
.
├── deps
│   └── flink-sql-connector-kafka_2.12-1.12.0.jar
├── flink_run.sh
├── main.py
├── pyflink1.12.0.zip
└── tasks
    └── user_last_n_clicks
        ├── sink_ddl.sql
        ├── source_ddl.sql
        └── transform_dml.sql
```

This is the `flink_run.sh`:

```
flink run \
--yarnname test-pyflink \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py main.py testing user_last_n_clicks
```

This is the `main.py`. The key logic is in:
- `parse_content` udf.
- load sql files from tasks subfolder, and execute_sql

```python
import os
from sys import argv
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf

def read_file_content(filepath):
    with open(filepath) as f:
        return f.read()

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse_content(content_str):
    import json
    res = {}
    content = json.loads(content_str)
    if 'postId' in content:
        res['item_id'] = content['postId']
    if 'lid' in content:
        res['item_id'] = content['lid']
    if 'param' in content and 'tag' in content['param']:
        res['tag'] = content['param']['tag']
    return res

CWD = os.getcwd()
_, palfish_env, task = argv

VALID_PALFISH_ENVS = ['development', 'testing', 'production']
if palfish_env not in VALID_PALFISH_ENVS:
    raise Exception(f"{palfish_env} is not a valid env, should be one of [{', '.join(VALID_PALFISH_ENVS)}].")

VALID_TASKS = os.listdir(f"{CWD}/tasks")
if task not in VALID_TASKS:
    raise Exception(f"{task} is not a valid task, should be one of [{', '.join(VALID_TASKS)}].")

config = {
    "development": {
        "${generation.kafka.source.servers}": "localhost:9094",
        "${generation.kafka.sink.servers}": "localhost:9094"
    },
    "testing": {
        "${generation.kafka.source.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092",
        "${generation.kafka.sink.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092"
    },
    "production": {
        "${generation.kafka.source.servers}": "10.111.203.9:9092,10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,10.111.204.164:9092,10.111.204.165:9092",
        "${generation.kafka.sink.servers}": "10.111.209.219:9092,10.111.209.220:9092,10.111.209.221:9092"
    }
}

FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"

source_ddl = read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', config[palfish_env]['${generation.kafka.source.servers}'])
sink_ddl = read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', config[palfish_env]['${generation.kafka.sink.servers}'])
transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')

exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=exec_env, environment_settings=env_settings)

t_env.get_config().get_configuration().set_string("pipeline.jars", f"file://{FAT_JAR_PATH}")
t_env.create_temporary_function("ParseContent", parse_content)

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql(transform_dml).wait()
```

See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.

```sql
# source_ddl.sql
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
'properties.group.id' = 'flink-featurepipelines',
'format' = 'json'
)

# transform_ddl.sql
INSERT INTO kafka_sink
WITH t1 AS (
SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) content, body['log']['serverts'] server_ts
FROM kafka_source
),
t2 AS (
SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
FROM t1
WHERE content['item_id'] IS NOT NULL
AND content['tag'] = '点击帖子卡片'
),
last_n AS (
SELECT user_id, item_id, server_ts
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
FROM t2)
WHERE row_num <= 5
)
SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, LISTAGG(CAST(item_id AS STRING)) last_5_clicks
FROM last_n
GROUP BY user_id

# sink_ddl.sql
CREATE TABLE kafka_sink (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
    'key.format' = 'json',
    'value.format' = 'json'
)
```

I got the error when running the PyFlink program in my testing environment machine.

```
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
```

Here are the full logs, see https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.

Any idea why the exception? Thanks.

Yik San


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink java.io.EOFException at java.io.DataInputStream.readInt

Yik San Chan
Hi Dian,

I simplify the question in https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully. You can also find the updated question below:

I have a PyFlink job that reads from a file, filter based on a condition, and print. This is a `tree` view of my working directory. This is the PyFlink script main.py:

```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

# https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse(s):
  import json
  # a dummy parser
  res = {'item_id': 123, 'tag': 'a'}
  return res

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

t_env.register_function("parse", parse)

my_source_ddl = """
create table mySource (
    id BIGINT,
    contentstr STRING
) with (
    'connector' = 'filesystem',
    'format' = 'json',
    'path' = '/tmp/input'
)
"""

my_sink_ddl = """
create table mySink (
    id BIGINT
) with (
    'connector' = 'print'
)
"""

my_transform_dml = """
insert into mySink
with t1 as (
    select id, parse(contentstr) as content
    from mySource
)
select id
from t1
where content['item_id'] is not null
and content['tag'] = 'a'
"""

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)
t_env.execute_sql(my_transform_dml).wait()
```

To run the `main.py`:
- Ensure installing pyflink==1.12.0 in my conda env
- /tmp/input has a single row of content `{"id":1,"tag":"a"}`

Then I run `main.py` and I get the exception:

```
Traceback (most recent call last):
  File "udf_parse.py", line 53, in <module>
    t_env.execute_sql(my_transform_dml).wait()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table_result.py", line 76, in wait
    get_method(self._j_table_result, "await")()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o53.await.
: java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123)
at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363)
at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46)
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
```

The issue is probably related to the udf. 

Any help? Thanks!

Best,
Yik San

On Fri, Mar 19, 2021 at 11:58 AM Yik San Chan <[hidden email]> wrote:
Hi Dian,

I am able to reproduce this issue in a much simpler setup. Let me update with the simpler reproducible example shortly.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <[hidden email]> wrote:
Hi Dian,

It is a good catch, though after changing to use flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same error.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <[hidden email]> wrote:

I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does the jar files in the cluster nodes are also built with Scala 2.12? PyFlink package bundles jar files with Scala 2.11 by default. I’m still not sure if it’s related to this issue. However, I think this is problematic. Could you make sure that they are consistent?


2021年3月19日 上午10:40,Yik San Chan <[hidden email]> 写道:

Hi Dian,

The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is also 1.12.0

$ which flink
/data/apache/flink/flink-1.12.0/bin/flink

Best,
Yik San

On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <[hidden email]> wrote:
Hi,

What’s the Flink version in the cluster nodes? It should matches the PyFlink version.

Regards,
Dian

2021年3月18日 下午5:01,Yik San Chan <[hidden email]> 写道:

This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint

I have a PyFlink job that reads from Kafka source, transform, and write to Kafka sink. This is a `tree` view of my working directory.

```
> tree
.
├── deps
│   └── flink-sql-connector-kafka_2.12-1.12.0.jar
├── flink_run.sh
├── main.py
├── pyflink1.12.0.zip
└── tasks
    └── user_last_n_clicks
        ├── sink_ddl.sql
        ├── source_ddl.sql
        └── transform_dml.sql
```

This is the `flink_run.sh`:

```
flink run \
--yarnname test-pyflink \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py main.py testing user_last_n_clicks
```

This is the `main.py`. The key logic is in:
- `parse_content` udf.
- load sql files from tasks subfolder, and execute_sql

```python
import os
from sys import argv
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf

def read_file_content(filepath):
    with open(filepath) as f:
        return f.read()

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse_content(content_str):
    import json
    res = {}
    content = json.loads(content_str)
    if 'postId' in content:
        res['item_id'] = content['postId']
    if 'lid' in content:
        res['item_id'] = content['lid']
    if 'param' in content and 'tag' in content['param']:
        res['tag'] = content['param']['tag']
    return res

CWD = os.getcwd()
_, palfish_env, task = argv

VALID_PALFISH_ENVS = ['development', 'testing', 'production']
if palfish_env not in VALID_PALFISH_ENVS:
    raise Exception(f"{palfish_env} is not a valid env, should be one of [{', '.join(VALID_PALFISH_ENVS)}].")

VALID_TASKS = os.listdir(f"{CWD}/tasks")
if task not in VALID_TASKS:
    raise Exception(f"{task} is not a valid task, should be one of [{', '.join(VALID_TASKS)}].")

config = {
    "development": {
        "${generation.kafka.source.servers}": "localhost:9094",
        "${generation.kafka.sink.servers}": "localhost:9094"
    },
    "testing": {
        "${generation.kafka.source.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092",
        "${generation.kafka.sink.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092"
    },
    "production": {
        "${generation.kafka.source.servers}": "10.111.203.9:9092,10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,10.111.204.164:9092,10.111.204.165:9092",
        "${generation.kafka.sink.servers}": "10.111.209.219:9092,10.111.209.220:9092,10.111.209.221:9092"
    }
}

FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"

source_ddl = read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', config[palfish_env]['${generation.kafka.source.servers}'])
sink_ddl = read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', config[palfish_env]['${generation.kafka.sink.servers}'])
transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')

exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=exec_env, environment_settings=env_settings)

t_env.get_config().get_configuration().set_string("pipeline.jars", f"file://{FAT_JAR_PATH}")
t_env.create_temporary_function("ParseContent", parse_content)

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql(transform_dml).wait()
```

See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.

```sql
# source_ddl.sql
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
'properties.group.id' = 'flink-featurepipelines',
'format' = 'json'
)

# transform_ddl.sql
INSERT INTO kafka_sink
WITH t1 AS (
SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) content, body['log']['serverts'] server_ts
FROM kafka_source
),
t2 AS (
SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
FROM t1
WHERE content['item_id'] IS NOT NULL
AND content['tag'] = '点击帖子卡片'
),
last_n AS (
SELECT user_id, item_id, server_ts
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
FROM t2)
WHERE row_num <= 5
)
SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, LISTAGG(CAST(item_id AS STRING)) last_5_clicks
FROM last_n
GROUP BY user_id

# sink_ddl.sql
CREATE TABLE kafka_sink (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
    'key.format' = 'json',
    'value.format' = 'json'
)
```

I got the error when running the PyFlink program in my testing environment machine.

```
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
```

Here are the full logs, see https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.

Any idea why the exception? Thanks.

Yik San


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink java.io.EOFException at java.io.DataInputStream.readInt

Yik San Chan
I got why regarding the simplified question - the dummy parser should return key(string)-value(string), otherwise it fails the result_type spec

On Fri, Mar 19, 2021 at 3:37 PM Yik San Chan <[hidden email]> wrote:
Hi Dian,

I simplify the question in https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully. You can also find the updated question below:

I have a PyFlink job that reads from a file, filter based on a condition, and print. This is a `tree` view of my working directory. This is the PyFlink script main.py:

```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

# https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse(s):
  import json
  # a dummy parser
  res = {'item_id': 123, 'tag': 'a'}
  return res

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

t_env.register_function("parse", parse)

my_source_ddl = """
create table mySource (
    id BIGINT,
    contentstr STRING
) with (
    'connector' = 'filesystem',
    'format' = 'json',
    'path' = '/tmp/input'
)
"""

my_sink_ddl = """
create table mySink (
    id BIGINT
) with (
    'connector' = 'print'
)
"""

my_transform_dml = """
insert into mySink
with t1 as (
    select id, parse(contentstr) as content
    from mySource
)
select id
from t1
where content['item_id'] is not null
and content['tag'] = 'a'
"""

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)
t_env.execute_sql(my_transform_dml).wait()
```

To run the `main.py`:
- Ensure installing pyflink==1.12.0 in my conda env
- /tmp/input has a single row of content `{"id":1,"tag":"a"}`

Then I run `main.py` and I get the exception:

```
Traceback (most recent call last):
  File "udf_parse.py", line 53, in <module>
    t_env.execute_sql(my_transform_dml).wait()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table_result.py", line 76, in wait
    get_method(self._j_table_result, "await")()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o53.await.
: java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123)
at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363)
at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46)
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
```

The issue is probably related to the udf. 

Any help? Thanks!

Best,
Yik San

On Fri, Mar 19, 2021 at 11:58 AM Yik San Chan <[hidden email]> wrote:
Hi Dian,

I am able to reproduce this issue in a much simpler setup. Let me update with the simpler reproducible example shortly.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <[hidden email]> wrote:
Hi Dian,

It is a good catch, though after changing to use flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same error.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <[hidden email]> wrote:

I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does the jar files in the cluster nodes are also built with Scala 2.12? PyFlink package bundles jar files with Scala 2.11 by default. I’m still not sure if it’s related to this issue. However, I think this is problematic. Could you make sure that they are consistent?


2021年3月19日 上午10:40,Yik San Chan <[hidden email]> 写道:

Hi Dian,

The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is also 1.12.0

$ which flink
/data/apache/flink/flink-1.12.0/bin/flink

Best,
Yik San

On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <[hidden email]> wrote:
Hi,

What’s the Flink version in the cluster nodes? It should matches the PyFlink version.

Regards,
Dian

2021年3月18日 下午5:01,Yik San Chan <[hidden email]> 写道:

This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint

I have a PyFlink job that reads from Kafka source, transform, and write to Kafka sink. This is a `tree` view of my working directory.

```
> tree
.
├── deps
│   └── flink-sql-connector-kafka_2.12-1.12.0.jar
├── flink_run.sh
├── main.py
├── pyflink1.12.0.zip
└── tasks
    └── user_last_n_clicks
        ├── sink_ddl.sql
        ├── source_ddl.sql
        └── transform_dml.sql
```

This is the `flink_run.sh`:

```
flink run \
--yarnname test-pyflink \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py main.py testing user_last_n_clicks
```

This is the `main.py`. The key logic is in:
- `parse_content` udf.
- load sql files from tasks subfolder, and execute_sql

```python
import os
from sys import argv
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf

def read_file_content(filepath):
    with open(filepath) as f:
        return f.read()

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse_content(content_str):
    import json
    res = {}
    content = json.loads(content_str)
    if 'postId' in content:
        res['item_id'] = content['postId']
    if 'lid' in content:
        res['item_id'] = content['lid']
    if 'param' in content and 'tag' in content['param']:
        res['tag'] = content['param']['tag']
    return res

CWD = os.getcwd()
_, palfish_env, task = argv

VALID_PALFISH_ENVS = ['development', 'testing', 'production']
if palfish_env not in VALID_PALFISH_ENVS:
    raise Exception(f"{palfish_env} is not a valid env, should be one of [{', '.join(VALID_PALFISH_ENVS)}].")

VALID_TASKS = os.listdir(f"{CWD}/tasks")
if task not in VALID_TASKS:
    raise Exception(f"{task} is not a valid task, should be one of [{', '.join(VALID_TASKS)}].")

config = {
    "development": {
        "${generation.kafka.source.servers}": "localhost:9094",
        "${generation.kafka.sink.servers}": "localhost:9094"
    },
    "testing": {
        "${generation.kafka.source.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092",
        "${generation.kafka.sink.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092"
    },
    "production": {
        "${generation.kafka.source.servers}": "10.111.203.9:9092,10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,10.111.204.164:9092,10.111.204.165:9092",
        "${generation.kafka.sink.servers}": "10.111.209.219:9092,10.111.209.220:9092,10.111.209.221:9092"
    }
}

FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"

source_ddl = read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', config[palfish_env]['${generation.kafka.source.servers}'])
sink_ddl = read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', config[palfish_env]['${generation.kafka.sink.servers}'])
transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')

exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=exec_env, environment_settings=env_settings)

t_env.get_config().get_configuration().set_string("pipeline.jars", f"file://{FAT_JAR_PATH}")
t_env.create_temporary_function("ParseContent", parse_content)

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql(transform_dml).wait()
```

See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.

```sql
# source_ddl.sql
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
'properties.group.id' = 'flink-featurepipelines',
'format' = 'json'
)

# transform_ddl.sql
INSERT INTO kafka_sink
WITH t1 AS (
SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) content, body['log']['serverts'] server_ts
FROM kafka_source
),
t2 AS (
SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
FROM t1
WHERE content['item_id'] IS NOT NULL
AND content['tag'] = '点击帖子卡片'
),
last_n AS (
SELECT user_id, item_id, server_ts
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
FROM t2)
WHERE row_num <= 5
)
SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, LISTAGG(CAST(item_id AS STRING)) last_5_clicks
FROM last_n
GROUP BY user_id

# sink_ddl.sql
CREATE TABLE kafka_sink (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
    'key.format' = 'json',
    'value.format' = 'json'
)
```

I got the error when running the PyFlink program in my testing environment machine.

```
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
```

Here are the full logs, see https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.

Any idea why the exception? Thanks.

Yik San


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink java.io.EOFException at java.io.DataInputStream.readInt

Xingbo Huang
Yes, you need to ensure that the key and value types of the Map are determined

Best,
Xingbo

Yik San Chan <[hidden email]> 于2021年3月19日周五 下午3:41写道:
I got why regarding the simplified question - the dummy parser should return key(string)-value(string), otherwise it fails the result_type spec

On Fri, Mar 19, 2021 at 3:37 PM Yik San Chan <[hidden email]> wrote:
Hi Dian,

I simplify the question in https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully. You can also find the updated question below:

I have a PyFlink job that reads from a file, filter based on a condition, and print. This is a `tree` view of my working directory. This is the PyFlink script main.py:

```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

# https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse(s):
  import json
  # a dummy parser
  res = {'item_id': 123, 'tag': 'a'}
  return res

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

t_env.register_function("parse", parse)

my_source_ddl = """
create table mySource (
    id BIGINT,
    contentstr STRING
) with (
    'connector' = 'filesystem',
    'format' = 'json',
    'path' = '/tmp/input'
)
"""

my_sink_ddl = """
create table mySink (
    id BIGINT
) with (
    'connector' = 'print'
)
"""

my_transform_dml = """
insert into mySink
with t1 as (
    select id, parse(contentstr) as content
    from mySource
)
select id
from t1
where content['item_id'] is not null
and content['tag'] = 'a'
"""

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)
t_env.execute_sql(my_transform_dml).wait()
```

To run the `main.py`:
- Ensure installing pyflink==1.12.0 in my conda env
- /tmp/input has a single row of content `{"id":1,"tag":"a"}`

Then I run `main.py` and I get the exception:

```
Traceback (most recent call last):
  File "udf_parse.py", line 53, in <module>
    t_env.execute_sql(my_transform_dml).wait()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table_result.py", line 76, in wait
    get_method(self._j_table_result, "await")()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o53.await.
: java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123)
at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363)
at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46)
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
```

The issue is probably related to the udf. 

Any help? Thanks!

Best,
Yik San

On Fri, Mar 19, 2021 at 11:58 AM Yik San Chan <[hidden email]> wrote:
Hi Dian,

I am able to reproduce this issue in a much simpler setup. Let me update with the simpler reproducible example shortly.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <[hidden email]> wrote:
Hi Dian,

It is a good catch, though after changing to use flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same error.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <[hidden email]> wrote:

I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does the jar files in the cluster nodes are also built with Scala 2.12? PyFlink package bundles jar files with Scala 2.11 by default. I’m still not sure if it’s related to this issue. However, I think this is problematic. Could you make sure that they are consistent?


2021年3月19日 上午10:40,Yik San Chan <[hidden email]> 写道:

Hi Dian,

The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is also 1.12.0

$ which flink
/data/apache/flink/flink-1.12.0/bin/flink

Best,
Yik San

On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <[hidden email]> wrote:
Hi,

What’s the Flink version in the cluster nodes? It should matches the PyFlink version.

Regards,
Dian

2021年3月18日 下午5:01,Yik San Chan <[hidden email]> 写道:

This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint

I have a PyFlink job that reads from Kafka source, transform, and write to Kafka sink. This is a `tree` view of my working directory.

```
> tree
.
├── deps
│   └── flink-sql-connector-kafka_2.12-1.12.0.jar
├── flink_run.sh
├── main.py
├── pyflink1.12.0.zip
└── tasks
    └── user_last_n_clicks
        ├── sink_ddl.sql
        ├── source_ddl.sql
        └── transform_dml.sql
```

This is the `flink_run.sh`:

```
flink run \
--yarnname test-pyflink \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py main.py testing user_last_n_clicks
```

This is the `main.py`. The key logic is in:
- `parse_content` udf.
- load sql files from tasks subfolder, and execute_sql

```python
import os
from sys import argv
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf

def read_file_content(filepath):
    with open(filepath) as f:
        return f.read()

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse_content(content_str):
    import json
    res = {}
    content = json.loads(content_str)
    if 'postId' in content:
        res['item_id'] = content['postId']
    if 'lid' in content:
        res['item_id'] = content['lid']
    if 'param' in content and 'tag' in content['param']:
        res['tag'] = content['param']['tag']
    return res

CWD = os.getcwd()
_, palfish_env, task = argv

VALID_PALFISH_ENVS = ['development', 'testing', 'production']
if palfish_env not in VALID_PALFISH_ENVS:
    raise Exception(f"{palfish_env} is not a valid env, should be one of [{', '.join(VALID_PALFISH_ENVS)}].")

VALID_TASKS = os.listdir(f"{CWD}/tasks")
if task not in VALID_TASKS:
    raise Exception(f"{task} is not a valid task, should be one of [{', '.join(VALID_TASKS)}].")

config = {
    "development": {
        "${generation.kafka.source.servers}": "localhost:9094",
        "${generation.kafka.sink.servers}": "localhost:9094"
    },
    "testing": {
        "${generation.kafka.source.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092",
        "${generation.kafka.sink.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092"
    },
    "production": {
        "${generation.kafka.source.servers}": "10.111.203.9:9092,10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,10.111.204.164:9092,10.111.204.165:9092",
        "${generation.kafka.sink.servers}": "10.111.209.219:9092,10.111.209.220:9092,10.111.209.221:9092"
    }
}

FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"

source_ddl = read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', config[palfish_env]['${generation.kafka.source.servers}'])
sink_ddl = read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', config[palfish_env]['${generation.kafka.sink.servers}'])
transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')

exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=exec_env, environment_settings=env_settings)

t_env.get_config().get_configuration().set_string("pipeline.jars", f"file://{FAT_JAR_PATH}")
t_env.create_temporary_function("ParseContent", parse_content)

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql(transform_dml).wait()
```

See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.

```sql
# source_ddl.sql
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
'properties.group.id' = 'flink-featurepipelines',
'format' = 'json'
)

# transform_ddl.sql
INSERT INTO kafka_sink
WITH t1 AS (
SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) content, body['log']['serverts'] server_ts
FROM kafka_source
),
t2 AS (
SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
FROM t1
WHERE content['item_id'] IS NOT NULL
AND content['tag'] = '点击帖子卡片'
),
last_n AS (
SELECT user_id, item_id, server_ts
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
FROM t2)
WHERE row_num <= 5
)
SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, LISTAGG(CAST(item_id AS STRING)) last_5_clicks
FROM last_n
GROUP BY user_id

# sink_ddl.sql
CREATE TABLE kafka_sink (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
    'key.format' = 'json',
    'value.format' = 'json'
)
```

I got the error when running the PyFlink program in my testing environment machine.

```
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
```

Here are the full logs, see https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.

Any idea why the exception? Thanks.

Yik San


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink java.io.EOFException at java.io.DataInputStream.readInt

Dian Fu
Good finding! 

I think we should handle this case more friendly as I guess this issue should be very common for Python users since Python is dynamic language. I have created https://issues.apache.org/jira/browse/FLINK-21876 to follow up with this issue.

Regards,
Dian

2021年3月19日 下午6:57,Xingbo Huang <[hidden email]> 写道:

Yes, you need to ensure that the key and value types of the Map are determined

Best,
Xingbo

Yik San Chan <[hidden email]> 于2021年3月19日周五 下午3:41写道:
I got why regarding the simplified question - the dummy parser should return key(string)-value(string), otherwise it fails the result_type spec

On Fri, Mar 19, 2021 at 3:37 PM Yik San Chan <[hidden email]> wrote:
Hi Dian,

I simplify the question in https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully. You can also find the updated question below:

I have a PyFlink job that reads from a file, filter based on a condition, and print. This is a `tree` view of my working directory. This is the PyFlink script main.py:

```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

# https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse(s):
  import json
  # a dummy parser
  res = {'item_id': 123, 'tag': 'a'}
  return res

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

t_env.register_function("parse", parse)

my_source_ddl = """
create table mySource (
    id BIGINT,
    contentstr STRING
) with (
    'connector' = 'filesystem',
    'format' = 'json',
    'path' = '/tmp/input'
)
"""

my_sink_ddl = """
create table mySink (
    id BIGINT
) with (
    'connector' = 'print'
)
"""

my_transform_dml = """
insert into mySink
with t1 as (
    select id, parse(contentstr) as content
    from mySource
)
select id
from t1
where content['item_id'] is not null
and content['tag'] = 'a'
"""

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)
t_env.execute_sql(my_transform_dml).wait()
```

To run the `main.py`:
- Ensure installing pyflink==1.12.0 in my conda env
- /tmp/input has a single row of content `{"id":1,"tag":"a"}`

Then I run `main.py` and I get the exception:

```
Traceback (most recent call last):
  File "udf_parse.py", line 53, in <module>
    t_env.execute_sql(my_transform_dml).wait()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table_result.py", line 76, in wait
    get_method(self._j_table_result, "await")()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o53.await.
: java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123)
at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363)
at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46)
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
```

The issue is probably related to the udf. 

Any help? Thanks!

Best,
Yik San

On Fri, Mar 19, 2021 at 11:58 AM Yik San Chan <[hidden email]> wrote:
Hi Dian,

I am able to reproduce this issue in a much simpler setup. Let me update with the simpler reproducible example shortly.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <[hidden email]> wrote:
Hi Dian,

It is a good catch, though after changing to use flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same error.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <[hidden email]> wrote:

I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does the jar files in the cluster nodes are also built with Scala 2.12? PyFlink package bundles jar files with Scala 2.11 by default. I’m still not sure if it’s related to this issue. However, I think this is problematic. Could you make sure that they are consistent?


2021年3月19日 上午10:40,Yik San Chan <[hidden email]> 写道:

Hi Dian,

The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is also 1.12.0

$ which flink
/data/apache/flink/flink-1.12.0/bin/flink

Best,
Yik San

On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <[hidden email]> wrote:
Hi,

What’s the Flink version in the cluster nodes? It should matches the PyFlink version.

Regards,
Dian

2021年3月18日 下午5:01,Yik San Chan <[hidden email]> 写道:

This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint

I have a PyFlink job that reads from Kafka source, transform, and write to Kafka sink. This is a `tree` view of my working directory.

```
> tree
.
├── deps
│   └── flink-sql-connector-kafka_2.12-1.12.0.jar
├── flink_run.sh
├── main.py
├── pyflink1.12.0.zip
└── tasks
    └── user_last_n_clicks
        ├── sink_ddl.sql
        ├── source_ddl.sql
        └── transform_dml.sql
```

This is the `flink_run.sh`:

```
flink run \
--yarnname test-pyflink \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py main.py testing user_last_n_clicks
```

This is the `main.py`. The key logic is in:
- `parse_content` udf.
- load sql files from tasks subfolder, and execute_sql

```python
import os
from sys import argv
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf

def read_file_content(filepath):
    with open(filepath) as f:
        return f.read()

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse_content(content_str):
    import json
    res = {}
    content = json.loads(content_str)
    if 'postId' in content:
        res['item_id'] = content['postId']
    if 'lid' in content:
        res['item_id'] = content['lid']
    if 'param' in content and 'tag' in content['param']:
        res['tag'] = content['param']['tag']
    return res

CWD = os.getcwd()
_, palfish_env, task = argv

VALID_PALFISH_ENVS = ['development', 'testing', 'production']
if palfish_env not in VALID_PALFISH_ENVS:
    raise Exception(f"{palfish_env} is not a valid env, should be one of [{', '.join(VALID_PALFISH_ENVS)}].")

VALID_TASKS = os.listdir(f"{CWD}/tasks")
if task not in VALID_TASKS:
    raise Exception(f"{task} is not a valid task, should be one of [{', '.join(VALID_TASKS)}].")

config = {
    "development": {
        "${generation.kafka.source.servers}": "localhost:9094",
        "${generation.kafka.sink.servers}": "localhost:9094"
    },
    "testing": {
        "${generation.kafka.source.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092",
        "${generation.kafka.sink.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092"
    },
    "production": {
        "${generation.kafka.source.servers}": "10.111.203.9:9092,10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,10.111.204.164:9092,10.111.204.165:9092",
        "${generation.kafka.sink.servers}": "10.111.209.219:9092,10.111.209.220:9092,10.111.209.221:9092"
    }
}

FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"

source_ddl = read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', config[palfish_env]['${generation.kafka.source.servers}'])
sink_ddl = read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', config[palfish_env]['${generation.kafka.sink.servers}'])
transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')

exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=exec_env, environment_settings=env_settings)

t_env.get_config().get_configuration().set_string("pipeline.jars", f"file://{FAT_JAR_PATH}")
t_env.create_temporary_function("ParseContent", parse_content)

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql(transform_dml).wait()
```

See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.

```sql
# source_ddl.sql
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
'properties.group.id' = 'flink-featurepipelines',
'format' = 'json'
)

# transform_ddl.sql
INSERT INTO kafka_sink
WITH t1 AS (
SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) content, body['log']['serverts'] server_ts
FROM kafka_source
),
t2 AS (
SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
FROM t1
WHERE content['item_id'] IS NOT NULL
AND content['tag'] = '点击帖子卡片'
),
last_n AS (
SELECT user_id, item_id, server_ts
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
FROM t2)
WHERE row_num <= 5
)
SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, LISTAGG(CAST(item_id AS STRING)) last_5_clicks
FROM last_n
GROUP BY user_id

# sink_ddl.sql
CREATE TABLE kafka_sink (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
    'key.format' = 'json',
    'value.format' = 'json'
)
```

I got the error when running the PyFlink program in my testing environment machine.

```
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
```

Here are the full logs, see https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.

Any idea why the exception? Thanks.

Yik San



Reply | Threaded
Open this post in threaded view
|

Re: PyFlink java.io.EOFException at java.io.DataInputStream.readInt

Yik San Chan
Hi Dian,

Thank you for your help!

Best,
Yik San

On Fri, Mar 19, 2021 at 9:33 PM Dian Fu <[hidden email]> wrote:
Good finding! 

I think we should handle this case more friendly as I guess this issue should be very common for Python users since Python is dynamic language. I have created https://issues.apache.org/jira/browse/FLINK-21876 to follow up with this issue.

Regards,
Dian

2021年3月19日 下午6:57,Xingbo Huang <[hidden email]> 写道:

Yes, you need to ensure that the key and value types of the Map are determined

Best,
Xingbo

Yik San Chan <[hidden email]> 于2021年3月19日周五 下午3:41写道:
I got why regarding the simplified question - the dummy parser should return key(string)-value(string), otherwise it fails the result_type spec

On Fri, Mar 19, 2021 at 3:37 PM Yik San Chan <[hidden email]> wrote:
Hi Dian,

I simplify the question in https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully. You can also find the updated question below:

I have a PyFlink job that reads from a file, filter based on a condition, and print. This is a `tree` view of my working directory. This is the PyFlink script main.py:

```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

# https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
# https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse(s):
  import json
  # a dummy parser
  res = {'item_id': 123, 'tag': 'a'}
  return res

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

t_env.register_function("parse", parse)

my_source_ddl = """
create table mySource (
    id BIGINT,
    contentstr STRING
) with (
    'connector' = 'filesystem',
    'format' = 'json',
    'path' = '/tmp/input'
)
"""

my_sink_ddl = """
create table mySink (
    id BIGINT
) with (
    'connector' = 'print'
)
"""

my_transform_dml = """
insert into mySink
with t1 as (
    select id, parse(contentstr) as content
    from mySource
)
select id
from t1
where content['item_id'] is not null
and content['tag'] = 'a'
"""

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)
t_env.execute_sql(my_transform_dml).wait()
```

To run the `main.py`:
- Ensure installing pyflink==1.12.0 in my conda env
- /tmp/input has a single row of content `{"id":1,"tag":"a"}`

Then I run `main.py` and I get the exception:

```
Traceback (most recent call last):
  File "udf_parse.py", line 53, in <module>
    t_env.execute_sql(my_transform_dml).wait()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table_result.py", line 76, in wait
    get_method(self._j_table_result, "await")()
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o53.await.
: java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123)
at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363)
at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110)
at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46)
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106)
at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49)
at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
```

The issue is probably related to the udf. 

Any help? Thanks!

Best,
Yik San

On Fri, Mar 19, 2021 at 11:58 AM Yik San Chan <[hidden email]> wrote:
Hi Dian,

I am able to reproduce this issue in a much simpler setup. Let me update with the simpler reproducible example shortly.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <[hidden email]> wrote:
Hi Dian,

It is a good catch, though after changing to use flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same error.

Best,
Yik San

On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <[hidden email]> wrote:

I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does the jar files in the cluster nodes are also built with Scala 2.12? PyFlink package bundles jar files with Scala 2.11 by default. I’m still not sure if it’s related to this issue. However, I think this is problematic. Could you make sure that they are consistent?


2021年3月19日 上午10:40,Yik San Chan <[hidden email]> 写道:

Hi Dian,

The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is also 1.12.0

$ which flink
/data/apache/flink/flink-1.12.0/bin/flink

Best,
Yik San

On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <[hidden email]> wrote:
Hi,

What’s the Flink version in the cluster nodes? It should matches the PyFlink version.

Regards,
Dian

2021年3月18日 下午5:01,Yik San Chan <[hidden email]> 写道:

This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint

I have a PyFlink job that reads from Kafka source, transform, and write to Kafka sink. This is a `tree` view of my working directory.

```
> tree
.
├── deps
│   └── flink-sql-connector-kafka_2.12-1.12.0.jar
├── flink_run.sh
├── main.py
├── pyflink1.12.0.zip
└── tasks
    └── user_last_n_clicks
        ├── sink_ddl.sql
        ├── source_ddl.sql
        └── transform_dml.sql
```

This is the `flink_run.sh`:

```
flink run \
--yarnname test-pyflink \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py main.py testing user_last_n_clicks
```

This is the `main.py`. The key logic is in:
- `parse_content` udf.
- load sql files from tasks subfolder, and execute_sql

```python
import os
from sys import argv
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.udf import udf

def read_file_content(filepath):
    with open(filepath) as f:
        return f.read()

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))
def parse_content(content_str):
    import json
    res = {}
    content = json.loads(content_str)
    if 'postId' in content:
        res['item_id'] = content['postId']
    if 'lid' in content:
        res['item_id'] = content['lid']
    if 'param' in content and 'tag' in content['param']:
        res['tag'] = content['param']['tag']
    return res

CWD = os.getcwd()
_, palfish_env, task = argv

VALID_PALFISH_ENVS = ['development', 'testing', 'production']
if palfish_env not in VALID_PALFISH_ENVS:
    raise Exception(f"{palfish_env} is not a valid env, should be one of [{', '.join(VALID_PALFISH_ENVS)}].")

VALID_TASKS = os.listdir(f"{CWD}/tasks")
if task not in VALID_TASKS:
    raise Exception(f"{task} is not a valid task, should be one of [{', '.join(VALID_TASKS)}].")

config = {
    "development": {
        "${generation.kafka.source.servers}": "localhost:9094",
        "${generation.kafka.sink.servers}": "localhost:9094"
    },
    "testing": {
        "${generation.kafka.source.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092",
        "${generation.kafka.sink.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092"
    },
    "production": {
        "${generation.kafka.source.servers}": "10.111.203.9:9092,10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,10.111.204.164:9092,10.111.204.165:9092",
        "${generation.kafka.sink.servers}": "10.111.209.219:9092,10.111.209.220:9092,10.111.209.221:9092"
    }
}

FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar"

source_ddl = read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', config[palfish_env]['${generation.kafka.source.servers}'])
sink_ddl = read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', config[palfish_env]['${generation.kafka.sink.servers}'])
transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql')

exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=exec_env, environment_settings=env_settings)

t_env.get_config().get_configuration().set_string("pipeline.jars", f"file://{FAT_JAR_PATH}")
t_env.create_temporary_function("ParseContent", parse_content)

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql(transform_dml).wait()
```

See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`.

```sql
# source_ddl.sql
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '${generation.kafka.source.servers}',
'properties.group.id' = 'flink-featurepipelines',
'format' = 'json'
)

# transform_ddl.sql
INSERT INTO kafka_sink
WITH t1 AS (
SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) content, body['log']['serverts'] server_ts
FROM kafka_source
),
t2 AS (
SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts
FROM t1
WHERE content['item_id'] IS NOT NULL
AND content['tag'] = '点击帖子卡片'
),
last_n AS (
SELECT user_id, item_id, server_ts
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num
FROM t2)
WHERE row_num <= 5
)
SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, LISTAGG(CAST(item_id AS STRING)) last_5_clicks
FROM last_n
GROUP BY user_id

# sink_ddl.sql
CREATE TABLE kafka_sink (
    user_id BIGINT,
    datetime TIMESTAMP(3),
    last_5_clicks STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'aiinfra.fct.userfeature.0',
    'properties.bootstrap.servers' = '${generation.kafka.sink.servers}',
    'key.format' = 'json',
    'value.format' = 'json'
)
```

I got the error when running the PyFlink program in my testing environment machine.

```
Caused by: java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0]
    at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0]
```

Here are the full logs, see https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc.

Any idea why the exception? Thanks.

Yik San