This question is cross-posted on StackOverflow https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readint
I have a PyFlink job that reads from Kafka source, transform, and write to Kafka sink. This is a `tree` view of my working directory. ``` > tree . ├── deps │ └── flink-sql-connector-kafka_2.12-1.12.0.jar ├── flink_run.sh ├── main.py ├── pyflink1.12.0.zip └── tasks └── user_last_n_clicks ├── sink_ddl.sql ├── source_ddl.sql └── transform_dml.sql ``` This is the `flink_run.sh`: ``` flink run \ --yarnname test-pyflink \ -m yarn-cluster \ -yD yarn.application.queue=tech_platform \ -pyarch pyflink1.12.0.zip \ -pyexec /data/software/pyflink1.12.0/bin/python \ -py main.py testing user_last_n_clicks ``` This is the `main.py`. The key logic is in: - `parse_content` udf. - load sql files from tasks subfolder, and execute_sql ```python import os from sys import argv from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.udf import udf def read_file_content(filepath): with open(filepath) as f: return f.read() @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) def parse_content(content_str): import json res = {} content = json.loads(content_str) if 'postId' in content: res['item_id'] = content['postId'] if 'lid' in content: res['item_id'] = content['lid'] if 'param' in content and 'tag' in content['param']: res['tag'] = content['param']['tag'] return res CWD = os.getcwd() _, palfish_env, task = argv VALID_PALFISH_ENVS = ['development', 'testing', 'production'] if palfish_env not in VALID_PALFISH_ENVS: raise Exception(f"{palfish_env} is not a valid env, should be one of [{', '.join(VALID_PALFISH_ENVS)}].") VALID_TASKS = os.listdir(f"{CWD}/tasks") if task not in VALID_TASKS: raise Exception(f"{task} is not a valid task, should be one of [{', '.join(VALID_TASKS)}].") config = { "development": { "${generation.kafka.source.servers}": "localhost:9094", "${generation.kafka.sink.servers}": "localhost:9094" }, "testing": { "${generation.kafka.source.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092", "${generation.kafka.sink.servers}": "10.111.135.233:9092,10.111.130.11:9092,10.111.130.12:9092" }, "production": { "${generation.kafka.source.servers}": "10.111.203.9:9092,10.111.203.10:9092,10.111.203.13:9092,10.111.204.163:9092,10.111.204.164:9092,10.111.204.165:9092", "${generation.kafka.sink.servers}": "10.111.209.219:9092,10.111.209.220:9092,10.111.209.221:9092" } } FAT_JAR_PATH = f"{CWD}/deps/flink-sql-connector-kafka_2.12-1.12.0.jar" source_ddl = read_file_content(f'{CWD}/tasks/{task}/source_ddl.sql').replace('${generation.kafka.source.servers}', config[palfish_env]['${generation.kafka.source.servers}']) sink_ddl = read_file_content(f'{CWD}/tasks/{task}/sink_ddl.sql').replace('${generation.kafka.sink.servers}', config[palfish_env]['${generation.kafka.sink.servers}']) transform_dml = read_file_content(f'{CWD}/tasks/{task}/transform_dml.sql') exec_env = StreamExecutionEnvironment.get_execution_environment() env_settings = EnvironmentSettings.Builder().use_blink_planner().build() t_env = StreamTableEnvironment.create(stream_execution_environment=exec_env, environment_settings=env_settings) t_env.get_config().get_configuration().set_string("pipeline.jars", f"file://{FAT_JAR_PATH}") t_env.create_temporary_function("ParseContent", parse_content) t_env.execute_sql(source_ddl) t_env.execute_sql(sink_ddl) t_env.execute_sql(transform_dml).wait() ``` See my sqls. Note the udf `ParseContent` is used in `transform_dml.sql`. ```sql # source_ddl.sql CREATE TABLE kafka_source ( `body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>> ) WITH ( 'connector' = 'kafka', 'topic' = 'data-report-stat-old-logtype7', 'properties.bootstrap.servers' = '${generation.kafka.source.servers}', 'properties.group.id' = 'flink-featurepipelines', 'format' = 'json' ) # transform_ddl.sql INSERT INTO kafka_sink WITH t1 AS ( SELECT body['log']['uid'] user_id, ParseContent(body['log']['contentstr']) content, body['log']['serverts'] server_ts FROM kafka_source ), t2 AS ( SELECT user_id, content['item_id'] item_id, content['tag'] tag, server_ts FROM t1 WHERE content['item_id'] IS NOT NULL AND content['tag'] = '点击帖子卡片' ), last_n AS ( SELECT user_id, item_id, server_ts FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY server_ts DESC) as row_num FROM t2) WHERE row_num <= 5 ) SELECT user_id, CAST(MAX(server_ts / 1000) AS TIMESTAMP) datetime, LISTAGG(CAST(item_id AS STRING)) last_5_clicks FROM last_n GROUP BY user_id # sink_ddl.sql CREATE TABLE kafka_sink ( user_id BIGINT, datetime TIMESTAMP(3), last_5_clicks STRING, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'aiinfra.fct.userfeature.0', 'properties.bootstrap.servers' = '${generation.kafka.sink.servers}', 'key.format' = 'json', 'value.format' = 'json' ) ``` I got the error when running the PyFlink program in my testing environment machine. ``` Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[?:1.8.0_261] at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:91) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:87) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:36) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:124) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:107) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:104) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) ~[flink-python_2.11-1.12.0.jar:1.12.0] at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:84) ~[flink-python_2.11-1.12.0.jar:1.12.0] ``` Here are the full logs, see https://gist.github.com/YikSanChan/d3a5d25cdf2f3c1dc6b3dc93e48c4bbc. Any idea why the exception? Thanks. Yik San |
Hi,
What’s the Flink version in the cluster nodes? It should matches the PyFlink version. Regards, Dian
|
Hi Dian, The PyFlink version is 1.12.0 and the Flink version in the cluster nodes is also 1.12.0 $ which flink /data/apache/flink/flink-1.12.0/bin/flink Best, Yik San On Fri, Mar 19, 2021 at 10:26 AM Dian Fu <[hidden email]> wrote:
|
I noticed that you use "flink-sql-connector-kafka_2.12-1.12.0.jar”. Does the jar files in the cluster nodes are also built with Scala 2.12? PyFlink package bundles jar files with Scala 2.11 by default. I’m still not sure if it’s related to this issue. However, I think this is problematic. Could you make sure that they are consistent?
|
Hi Dian, It is a good catch, though after changing to use flink-sql-connector-kafka_2.11-1.12.0.jar I still get exactly the same error. Best, Yik San On Fri, Mar 19, 2021 at 11:02 AM Dian Fu <[hidden email]> wrote:
|
Hi Dian, I am able to reproduce this issue in a much simpler setup. Let me update with the simpler reproducible example shortly. Best, Yik San On Fri, Mar 19, 2021 at 11:28 AM Yik San Chan <[hidden email]> wrote:
|
Hi Dian, I simplify the question in https://stackoverflow.com/questions/66687797/pyflink-java-io-eofexception-at-java-io-datainputstream-readfully. You can also find the updated question below: I have a PyFlink job that reads from a file, filter based on a condition, and print. This is a `tree` view of my working directory. This is the PyFlink script main.py: ```python from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.udf import udf # https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html # https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) def parse(s): import json # a dummy parser res = {'item_id': 123, 'tag': 'a'} return res env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.register_function("parse", parse) my_source_ddl = """ create table mySource ( id BIGINT, contentstr STRING ) with ( 'connector' = 'filesystem', 'format' = 'json', 'path' = '/tmp/input' ) """ my_sink_ddl = """ create table mySink ( id BIGINT ) with ( 'connector' = 'print' ) """ my_transform_dml = """ insert into mySink with t1 as ( select id, parse(contentstr) as content from mySource ) select id from t1 where content['item_id'] is not null and content['tag'] = 'a' """ t_env.execute_sql(my_source_ddl) t_env.execute_sql(my_sink_ddl) t_env.execute_sql(my_transform_dml).wait() ``` To run the `main.py`: - Ensure installing pyflink==1.12.0 in my conda env - /tmp/input has a single row of content `{"id":1,"tag":"a"}` Then I run `main.py` and I get the exception: ``` Traceback (most recent call last): File "udf_parse.py", line 53, in <module> t_env.execute_sql(my_transform_dml).wait() File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table_result.py", line 76, in wait get_method(self._j_table_result, "await")() File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco return f(*a, **kw) File "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o53.await. : java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:123) at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:56) at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350) at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:363) at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:110) at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54) ... 7 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197) at java.io.DataInputStream.readFully(DataInputStream.java:169) at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserializeInternal(StringDataSerializer.java:88) at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:82) at org.apache.flink.table.runtime.typeutils.StringDataSerializer.deserialize(StringDataSerializer.java:34) at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserializeInternal(MapDataSerializer.java:129) at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:110) at org.apache.flink.table.runtime.typeutils.serializers.python.MapDataSerializer.deserialize(MapDataSerializer.java:46) at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:106) at org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer.deserialize(RowDataSerializer.java:49) at org.apache.flink.table.runtime.operators.python.scalar.RowDataPythonScalarFunctionOperator.emitResult(RowDataPythonScalarFunctionOperator.java:81) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:250) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:273) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:199) at org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:123) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitWatermark(SourceOperatorStreamTask.java:170) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.advanceToEndOfEventTime(SourceOperatorStreamTask.java:110) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.afterInvoke(SourceOperatorStreamTask.java:116) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:589) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) ``` The issue is probably related to the udf. Any help? Thanks! Best, Yik San On Fri, Mar 19, 2021 at 11:58 AM Yik San Chan <[hidden email]> wrote:
|
I got why regarding the simplified question - the dummy parser should return key(string)-value(string), otherwise it fails the result_type spec On Fri, Mar 19, 2021 at 3:37 PM Yik San Chan <[hidden email]> wrote:
|
Yes, you need to ensure that the key and value types of the Map are determined Best, Xingbo Yik San Chan <[hidden email]> 于2021年3月19日周五 下午3:41写道:
|
Good finding!
I think we should handle this case more friendly as I guess this issue should be very common for Python users since Python is dynamic language. I have created https://issues.apache.org/jira/browse/FLINK-21876 to follow up with this issue. Regards, Dian
|
Hi Dian, Thank you for your help! Best, Yik San On Fri, Mar 19, 2021 at 9:33 PM Dian Fu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |