PyFlink Table API and UDF Limitations

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

PyFlink Table API and UDF Limitations

Niklas Wilcke
Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take.

2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#############################################################

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
     result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):
   
    # Train the model and create the forcast

    yhat_ts = forcast['yhat'].tail(input_size)
    return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
    .where("riid === 'r1i1'") \
    .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
    .insert_into('mySink')

t_env.execute("pandas_udf_demo")

#############################################################



smime.p7s (4K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API and UDF Limitations

Dian Fu
Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same size and that Flink will split the input data into multiple bundles for Pandas UDF and the bundle size is non-determinstic. Both of the above two limitations are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of RC1[2] for 1.12.0 or build it yourself according to [3].

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Regards,
Dian

在 2020年11月11日,下午9:03,Niklas Wilcke <[hidden email]> 写道:

Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take.
2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#############################################################

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
    result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):

   # Train the model and create the forcast

   yhat_ts = forcast['yhat'].tail(input_size)
   return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
   .where("riid === 'r1i1'") \
   .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
   .insert_into('mySink')

t_env.execute("pandas_udf_demo")

#############################################################



Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API and UDF Limitations

Niklas Wilcke
Hi Dian,

thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue.

Another question that came up to my mind is whether PyFlink supports any other API except Table and SQL, like the Streaming and Batch API. The documentation is only covering the Table API, but I'm not sure about that. Can you maybe tell me whether the Table and SQL API is the only one supported by PyFlink?

Kind Regards,
Niklas

 

On 11. Nov 2020, at 15:32, Dian Fu <[hidden email]> wrote:

Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same size and that Flink will split the input data into multiple bundles for Pandas UDF and the bundle size is non-determinstic. Both of the above two limitations are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of RC1[2] for 1.12.0 or build it yourself according to [3].

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Regards,
Dian

在 2020年11月11日,下午9:03,Niklas Wilcke <[hidden email]> 写道:

Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take.
2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#############################################################

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
    result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):

   # Train the model and create the forcast

   yhat_ts = forcast['yhat'].tail(input_size)
   return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
   .where("riid === 'r1i1'") \
   .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
   .insert_into('mySink')

t_env.execute("pandas_udf_demo")

#############################################################





smime.p7s (4K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API and UDF Limitations

Dian Fu
Hi Niklas,

Python DataStream API will also be supported in coming release of 1.12.0 [1]. However, the functionalities are still limited for the time being compared to the Java DataStream API, e.g. it will only support the stateless operations, such as map, flat_map, etc.


在 2020年11月12日,下午7:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue.

Another question that came up to my mind is whether PyFlink supports any other API except Table and SQL, like the Streaming and Batch API. The documentation is only covering the Table API, but I'm not sure about that. Can you maybe tell me whether the Table and SQL API is the only one supported by PyFlink?

Kind Regards,
Niklas

 

On 11. Nov 2020, at 15:32, Dian Fu <[hidden email]> wrote:

Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same size and that Flink will split the input data into multiple bundles for Pandas UDF and the bundle size is non-determinstic. Both of the above two limitations are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of RC1[2] for 1.12.0 or build it yourself according to [3].

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Regards,
Dian

在 2020年11月11日,下午9:03,Niklas Wilcke <[hidden email]> 写道:

Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take.
2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#############################################################

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
    result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):

   # Train the model and create the forcast

   yhat_ts = forcast['yhat'].tail(input_size)
   return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
   .where("riid === 'r1i1'") \
   .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
   .insert_into('mySink')

t_env.execute("pandas_udf_demo")

#############################################################





Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API and UDF Limitations

Niklas Wilcke
Hi Dian,

thanks again for your response. In the meantime I tried out your proposal using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I am facing some issues, which I would like to address. If this goes too far, please let me know and I will open a new thread for each of the questions. Let me share some more information about my current environment, which will maybe help to answer the questions. I'm currently using my dev machine with Docker and one jobmanager container and one taskmanager container. If needed I can share the whole docker environment, but this would involve some more effort on my side. Here are my five questions.

1. Where can I find connector libraries for 1.12.0-rc1 or some kind of instruction how to build them? I can't find them in the 1.12.0-rc1 release and when I build flink from source, I can't find the connector libraries in the build target. I need flink-sql-connector-elasticsearch7 and flink-sql-connector-kafka.

2. Which steps are needed to properly Setup PyFlink? I followed the instructions, but I always get some ClassNotFoundExceptions for some Beam related classes in the taskmanager. The job still works fine, but this doesn't look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to resolve this by adding certain jars, but I wasn't able to fix it. Maybe you have an idea. You can find the Dockerfile attached, which lines out the steps I'm currently using. The Exceptions signature looks like this.
   Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1

3. When increasing the size of the input data set I get the following Exception and the job is canceled. I tried to increase the resources assigned to flink, but it didn't help. Do you have an idea why this is happening? You can find a more detailed stack trace in apendix.

4. I can't manage to get the SQL UNNEST operation to work. It is quite hard for me to debug it and I can't really find any valuable examples or documentation on the internet. Currently instead of creating an ARRAY I'm just returning a VARCHAR containing a string representation of the array. The relevant code you can find in the apendix.

5. How can I obtain the output of the Python interpreter executing the UDF. If I put a print statement in the UDF I can't see the output in the log of the taskmanager. Is there a way to access it?

I hope these aren't too many questions for this thread. If this is the case I can still split some of them out. Please let me know, if this is the case.
Thank you very much. I really appreciate your help.

Kind Regards,
Niklas


Dockerfile for question 2.
####################################################################
# This image has been build based on the Dockerfile used for the flink image on docker hub.
# The only change I applied is that I switched to flink 1.12.0-rc1.
FROM flink:1.12.0-rc1-scala_2.12

# Install python
# TODO: Minimize dependencies
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    python3-dev \
    zip \
  && rm -rf /var/lib/apt/lists/* \
  && ln -s /usr/bin/python3 /usr/bin/python \
  && ln -s /usr/bin/pip3 /usr/bin/pip

# Install pyflink
  && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \
  && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
####################################################################
Stack Trace for question 3.
####################################################################
Caused by: java.lang.RuntimeException: Failed to close remote bundle
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366)
        ... 17 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
################################################################
Code for question 4.
################################################################
# UDAF signature
@udaf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
     result_type=DataTypes.VARCHAR(10000), func_type='pandas')
def forcast(ds_float_series, y):

# SQL DDL
"create table mySource (ds FLOAT, riid VARCHAR(100), y FLOAT ) with ( ... )"
"create table mySink (riid VARCHAR(100), yhatd VARCHAR(10000)) with ( ... )"

# SQL INSERT
"INSERT INTO mySink SELECT riid, forcast(ds, y) AS yhat FROM mySource GROUP BY riid"
################################################################

On 12. Nov 2020, at 12:53, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Python DataStream API will also be supported in coming release of 1.12.0 [1]. However, the functionalities are still limited for the time being compared to the Java DataStream API, e.g. it will only support the stateless operations, such as map, flat_map, etc.


在 2020年11月12日,下午7:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue.

Another question that came up to my mind is whether PyFlink supports any other API except Table and SQL, like the Streaming and Batch API. The documentation is only covering the Table API, but I'm not sure about that. Can you maybe tell me whether the Table and SQL API is the only one supported by PyFlink?

Kind Regards,
Niklas

 

On 11. Nov 2020, at 15:32, Dian Fu <[hidden email]> wrote:

Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same size and that Flink will split the input data into multiple bundles for Pandas UDF and the bundle size is non-determinstic. Both of the above two limitations are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of RC1[2] for 1.12.0 or build it yourself according to [3].

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Regards,
Dian

在 2020年11月11日,下午9:03,Niklas Wilcke <[hidden email]> 写道:

Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take.
2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#############################################################

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
    result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):

   # Train the model and create the forcast

   yhat_ts = forcast['yhat'].tail(input_size)
   return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
   .where("riid === 'r1i1'") \
   .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
   .insert_into('mySink')

t_env.execute("pandas_udf_demo")

#############################################################







smime.p7s (4K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API and UDF Limitations

Dian Fu
Hi Niklas,

Good to know that this solution may work for you. Regarding to the questions you raised, please find my reply inline.

Regards,
Dian

在 2020年11月13日,下午8:48,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thanks again for your response. In the meantime I tried out your proposal using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I am facing some issues, which I would like to address. If this goes too far, please let me know and I will open a new thread for each of the questions. Let me share some more information about my current environment, which will maybe help to answer the questions. I'm currently using my dev machine with Docker and one jobmanager container and one taskmanager container. If needed I can share the whole docker environment, but this would involve some more effort on my side. Here are my five questions.

1. Where can I find connector libraries for 1.12.0-rc1 or some kind of instruction how to build them? I can't find them in the 1.12.0-rc1 release and when I build flink from source, I can't find the connector libraries in the build target. I need flink-sql-connector-elasticsearch7 and flink-sql-connector-kafka.

You could download the connector jars of 1.12.0-rc1 from here: https://repository.apache.org/content/repositories/orgapacheflink-1402/org/apache/flink/

2. Which steps are needed to properly Setup PyFlink? I followed the instructions, but I always get some ClassNotFoundExceptions for some Beam related classes in the taskmanager. The job still works fine, but this doesn't look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to resolve this by adding certain jars, but I wasn't able to fix it. Maybe you have an idea. You can find the Dockerfile attached, which lines out the steps I'm currently using. The Exceptions signature looks like this.
   Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1

Usually there is nothing specially need to do to set up PyFlink. I have manually checked that this class should be there(inside flink-python_2.11-1.12.0.jar) and so guess if it's because you environment isn't clean enough? 

I guess you could check the following things:
1) Is it because you have installed 1.11.2 before and so the environment is not clean enough? Could you uninstall PyFlink 1.11.2 manually and reinstall PyFlink 1.12.0-rc1 again? You could also manually check that there should be only one flink-python*.jar under directory xxx/site-packages/pyflink/opt/
2) Verify that the class is actually there by the following command: (flink-python_2.11-1.12.0.jar is under directory xxx/site-packages/pyflink/opt/)
jar tf flink-python_2.11-1.12.0.jar | grep "org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena"
3) If this exception still happens, could you share the exception stack?

3. When increasing the size of the input data set I get the following Exception and the job is canceled. I tried to increase the resources assigned to flink, but it didn't help. Do you have an idea why this is happening? You can find a more detailed stack trace in apendix.

Could you check if there are any other exceptions in the log when this exception happens?

4. I can't manage to get the SQL UNNEST operation to work. It is quite hard for me to debug it and I can't really find any valuable examples or documentation on the internet. Currently instead of creating an ARRAY I'm just returning a VARCHAR containing a string representation of the array. The relevant code you can find in the apendix.

There are some examples here: https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala

5. How can I obtain the output of the Python interpreter executing the UDF. If I put a print statement in the UDF I can't see the output in the log of the taskmanager. Is there a way to access it?

You can use the standard logging in Python UDF instead of print. The log output could then be found in the log of the task manager.

I hope these aren't too many questions for this thread. If this is the case I can still split some of them out. Please let me know, if this is the case.
Thank you very much. I really appreciate your help.

It's fine to reuse this thread. :)

Kind Regards,
Niklas


Dockerfile for question 2.
####################################################################
# This image has been build based on the Dockerfile used for the flink image on docker hub.
# The only change I applied is that I switched to flink 1.12.0-rc1.
FROM flink:1.12.0-rc1-scala_2.12

# Install python
# TODO: Minimize dependencies
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    python3-dev \
    zip \
  && rm -rf /var/lib/apt/lists/* \
  && ln -s /usr/bin/python3 /usr/bin/python \
  && ln -s /usr/bin/pip3 /usr/bin/pip

# Install pyflink
  && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \
  && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
####################################################################
Stack Trace for question 3.
####################################################################
Caused by: java.lang.RuntimeException: Failed to close remote bundle
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366)
        ... 17 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
################################################################
Code for question 4.
################################################################
# UDAF signature
@udaf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
     result_type=DataTypes.VARCHAR(10000), func_type='pandas')
def forcast(ds_float_series, y):

# SQL DDL
"create table mySource (ds FLOAT, riid VARCHAR(100), y FLOAT ) with ( ... )"
"create table mySink (riid VARCHAR(100), yhatd VARCHAR(10000)) with ( ... )"

# SQL INSERT
"INSERT INTO mySink SELECT riid, forcast(ds, y) AS yhat FROM mySource GROUP BY riid"
################################################################

On 12. Nov 2020, at 12:53, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Python DataStream API will also be supported in coming release of 1.12.0 [1]. However, the functionalities are still limited for the time being compared to the Java DataStream API, e.g. it will only support the stateless operations, such as map, flat_map, etc.


在 2020年11月12日,下午7:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue.

Another question that came up to my mind is whether PyFlink supports any other API except Table and SQL, like the Streaming and Batch API. The documentation is only covering the Table API, but I'm not sure about that. Can you maybe tell me whether the Table and SQL API is the only one supported by PyFlink?

Kind Regards,
Niklas

 

On 11. Nov 2020, at 15:32, Dian Fu <[hidden email]> wrote:

Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same size and that Flink will split the input data into multiple bundles for Pandas UDF and the bundle size is non-determinstic. Both of the above two limitations are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of RC1[2] for 1.12.0 or build it yourself according to [3].

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Regards,
Dian

在 2020年11月11日,下午9:03,Niklas Wilcke <[hidden email]> 写道:

Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take.
2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#############################################################

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
    result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):

   # Train the model and create the forcast

   yhat_ts = forcast['yhat'].tail(input_size)
   return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
   .where("riid === 'r1i1'") \
   .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
   .insert_into('mySink')

t_env.execute("pandas_udf_demo")

#############################################################







Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API and UDF Limitations

Niklas Wilcke
Hi Dian,

this was very helpful again. To the old questions I will answer inline as well. Unfortunately also one new question popped up.

How can I ingest data in a batch table from Kafka or even better Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch isn't offering a source at all.
The only workaround which comes to my mind is to use the Kafka streaming source and to apply a single very large window to create a bounded table. Do you think that would work?
Are there other options available? Maybe converting a Stream to a bounded table is somehow possible? Thank you!

Kind Regards,
Niklas



On 13. Nov 2020, at 16:07, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Good to know that this solution may work for you. Regarding to the questions you raised, please find my reply inline.

Regards,
Dian

在 2020年11月13日,下午8:48,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thanks again for your response. In the meantime I tried out your proposal using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I am facing some issues, which I would like to address. If this goes too far, please let me know and I will open a new thread for each of the questions. Let me share some more information about my current environment, which will maybe help to answer the questions. I'm currently using my dev machine with Docker and one jobmanager container and one taskmanager container. If needed I can share the whole docker environment, but this would involve some more effort on my side. Here are my five questions.

1. Where can I find connector libraries for 1.12.0-rc1 or some kind of instruction how to build them? I can't find them in the 1.12.0-rc1 release and when I build flink from source, I can't find the connector libraries in the build target. I need flink-sql-connector-elasticsearch7 and flink-sql-connector-kafka.

You could download the connector jars of 1.12.0-rc1 from here: https://repository.apache.org/content/repositories/orgapacheflink-1402/org/apache/flink/

Thanks that worked like a charm!


2. Which steps are needed to properly Setup PyFlink? I followed the instructions, but I always get some ClassNotFoundExceptions for some Beam related classes in the taskmanager. The job still works fine, but this doesn't look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to resolve this by adding certain jars, but I wasn't able to fix it. Maybe you have an idea. You can find the Dockerfile attached, which lines out the steps I'm currently using. The Exceptions signature looks like this.
   Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1

Usually there is nothing specially need to do to set up PyFlink. I have manually checked that this class should be there(inside flink-python_2.11-1.12.0.jar) and so guess if it's because you environment isn't clean enough? 

I guess you could check the following things:
1) Is it because you have installed 1.11.2 before and so the environment is not clean enough? Could you uninstall PyFlink 1.11.2 manually and reinstall PyFlink 1.12.0-rc1 again? You could also manually check that there should be only one flink-python*.jar under directory xxx/site-packages/pyflink/opt/
2) Verify that the class is actually there by the following command: (flink-python_2.11-1.12.0.jar is under directory xxx/site-packages/pyflink/opt/)
jar tf flink-python_2.11-1.12.0.jar | grep "org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena"
3) If this exception still happens, could you share the exception stack?

I found one cause of this problem and it was mixing a Scala 2.12 Flink installation with PyFlink, which has some 2.11 jars in its opt folder. I think the JVM just skipped the class definitions, because they weren't compatible. I actually wasn't aware of the fact that PyFlink comes with prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it would make sense to point that out in the documentation. I think I never read that and it might be missing. Unfortunately there is still one Exception showing up at the very end of the job in the taskmanager log. I did the verification you asked for and the class is present in both jar files. The one which comes with Flink in the opt folder and the one of PyFlink. You can find the log attached.
I think the main question is which jar file has be loaded in in the three envronments (executor, jobmanager, taskmanager). Is it fine to not put the flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and taskmanager?


3. When increasing the size of the input data set I get the following Exception and the job is canceled. I tried to increase the resources assigned to flink, but it didn't help. Do you have an idea why this is happening? You can find a more detailed stack trace in apendix.

Could you check if there are any other exceptions in the log when this exception happens?

No I don't think that there are additional exceptions besides "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but maybe take a look in the attached log files. This problem could be related to 2., maybe the root cause is a class loading issue as well. What do you think? You can find attached three log files. One for the executor, the jobmanager and the taskmanager. Maybe you can find something useful.


4. I can't manage to get the SQL UNNEST operation to work. It is quite hard for me to debug it and I can't really find any valuable examples or documentation on the internet. Currently instead of creating an ARRAY I'm just returning a VARCHAR containing a string representation of the array. The relevant code you can find in the apendix.

There are some examples here: https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala

This was very helpful. I was able to implement it. There is only one detail missing. Is it possible to UNNEST an array of Rows or tuples? It would be really great if I would be able to return a list with multiple fields. Currently I'm just putting multiple value into a single VARCHAR, but that means the information needs to be extracted later on. Maybe you have an idea how to avoid that.


5. How can I obtain the output of the Python interpreter executing the UDF. If I put a print statement in the UDF I can't see the output in the log of the taskmanager. Is there a way to access it?

You can use the standard logging in Python UDF instead of print. The log output could then be found in the log of the task manager.

Thank you! That worked well. I should have checked that without asking.


I hope these aren't too many questions for this thread. If this is the case I can still split some of them out. Please let me know, if this is the case.
Thank you very much. I really appreciate your help.

It's fine to reuse this thread. :)

Kind Regards,
Niklas



End of the Taskmanager Log for 2.
###################################################################
taskmanager_1    | 2020-11-15 17:46:53,438 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:5, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: e5137050c0f1ef5e660311ddf1f3429f, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,440 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 541ad3e383fb9c024141f2bab5e8b7fd, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,442 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: ef8adb7d879f4072123fe4bc12054c0c, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,444 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:4, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: db5d62b8c9fe8172fc1883c148b150e8, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,846 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 637d053a0726548c2bc9261fc0e55414, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,849 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: cfaa8633b9102e3a509cfc94dd97d38f, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,851 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job ba4e3974860af7dc00a28fdfbb44fe06 from job leader monitoring.
taskmanager_1    | 2020-11-15 17:46:53,851 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job ba4e3974860af7dc00a28fdfbb44fe06.
taskmanager_1    | 2020-11-15 17:46:54,371 ERROR org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.rejectedExecution [] - Failed to submit a listener notification task. Event loop shut down?
taskmanager_1    | java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/util/concurrent/GlobalEventExecutor$2
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:227) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:215) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:498) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1089) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
taskmanager_1    | Caused by: java.lang.ClassNotFoundException: org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor$2
taskmanager_1    |      at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_275]
taskmanager_1    |      at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_275]
taskmanager_1    |      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_275]
taskmanager_1    |      ... 11 more
###################################################################
Logfiles for 3.





Dockerfile for question 2.
####################################################################
# This image has been build based on the Dockerfile used for the flink image on docker hub.
# The only change I applied is that I switched to flink 1.12.0-rc1.
FROM flink:1.12.0-rc1-scala_2.12

# Install python
# TODO: Minimize dependencies
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    python3-dev \
    zip \
  && rm -rf /var/lib/apt/lists/* \
  && ln -s /usr/bin/python3 /usr/bin/python \
  && ln -s /usr/bin/pip3 /usr/bin/pip

# Install pyflink
  && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \
  && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
####################################################################
Stack Trace for question 3.
####################################################################
Caused by: java.lang.RuntimeException: Failed to close remote bundle
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366)
        ... 17 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
################################################################
Code for question 4.
################################################################
# UDAF signature
@udaf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
     result_type=DataTypes.VARCHAR(10000), func_type='pandas')
def forcast(ds_float_series, y):

# SQL DDL
"create table mySource (ds FLOAT, riid VARCHAR(100), y FLOAT ) with ( ... )"
"create table mySink (riid VARCHAR(100), yhatd VARCHAR(10000)) with ( ... )"

# SQL INSERT
"INSERT INTO mySink SELECT riid, forcast(ds, y) AS yhat FROM mySource GROUP BY riid"
################################################################

On 12. Nov 2020, at 12:53, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Python DataStream API will also be supported in coming release of 1.12.0 [1]. However, the functionalities are still limited for the time being compared to the Java DataStream API, e.g. it will only support the stateless operations, such as map, flat_map, etc.


在 2020年11月12日,下午7:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue.

Another question that came up to my mind is whether PyFlink supports any other API except Table and SQL, like the Streaming and Batch API. The documentation is only covering the Table API, but I'm not sure about that. Can you maybe tell me whether the Table and SQL API is the only one supported by PyFlink?

Kind Regards,
Niklas

 

On 11. Nov 2020, at 15:32, Dian Fu <[hidden email]> wrote:

Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same size and that Flink will split the input data into multiple bundles for Pandas UDF and the bundle size is non-determinstic. Both of the above two limitations are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of RC1[2] for 1.12.0 or build it yourself according to [3].

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Regards,
Dian

在 2020年11月11日,下午9:03,Niklas Wilcke <[hidden email]> 写道:

Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take.
2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#############################################################

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
    result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):

   # Train the model and create the forcast

   yhat_ts = forcast['yhat'].tail(input_size)
   return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
   .where("riid === 'r1i1'") \
   .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
   .insert_into('mySink')

t_env.execute("pandas_udf_demo")

#############################################################


large-data-set-taskmanager.log (366K) Download Attachment
large-data-set-jobmanager.log (151K) Download Attachment
large-data-set-executor.log (14K) Download Attachment
smime.p7s (4K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API and UDF Limitations

Dian Fu
Hi Niklas,

How can I ingest data in a batch table from Kafka or even better Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch isn't offering a source at all.
The only workaround which comes to my mind is to use the Kafka streaming source and to apply a single very large window to create a bounded table. Do you think that would work?
Are there other options available? Maybe converting a Stream to a bounded table is somehow possible? Thank you!

I think you are right that Kafka still doesn't support batch and there is no ES source for now. Another option is you could load the data into a connector which supports batch. Not sure if anybody else has a better idea about this.

I found one cause of this problem and it was mixing a Scala 2.12 Flink installation with PyFlink, which has some 2.11 jars in its opt folder. I think the JVM just skipped the class definitions, because they weren't compatible. I actually wasn't aware of the fact that PyFlink comes with prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it would make sense to point that out in the documentation. I think I never read that and it might be missing. Unfortunately there is still one Exception showing up at the very end of the job in the taskmanager log. I did the verification you asked for and the class is present in both jar files. The one which comes with Flink in the opt folder and the one of PyFlink. You can find the log attached.
I think the main question is which jar file has be loaded in in the three envronments (executor, jobmanager, taskmanager). Is it fine to not put the flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and taskmanager?

PyFlink comes with the built-in jars such as flink-python_2.11-1.12.0.jar, flink-dist_2.11-1.12.0.jar, etc and so you don't need to manually add them(also shouldn't do that). Could you remove the duplicate jars and try it again?

No I don't think that there are additional exceptions besides "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but maybe take a look in the attached log files. This problem could be related to 2., maybe the root cause is a class loading issue as well. What do you think? You can find attached three log files. One for the executor, the jobmanager and the taskmanager. Maybe you can find something useful.

I found one similar issue at Beam side: https://issues.apache.org/jira/browse/BEAM-6258 which has been resolved long time ago. I'm still trying to reproduce this issue and will let you know if there is any progress. (Would be great if you could help to provide an example which could easily reproduce this issue)

This was very helpful. I was able to implement it. There is only one detail missing. Is it possible to UNNEST an array of Rows or tuples? It would be really great if I would be able to return a list with multiple fields. Currently I'm just putting multiple value into a single VARCHAR, but that means the information needs to be extracted later on. Maybe you have an idea how to avoid that.

Currently, Pandas UDAF still doesn't support complex type and so I'm afraid that you have to put multiple values into a single VARCHAR for now.

Regards,
Dian


在 2020年11月16日,上午2:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

this was very helpful again. To the old questions I will answer inline as well. Unfortunately also one new question popped up.

How can I ingest data in a batch table from Kafka or even better Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch isn't offering a source at all.
The only workaround which comes to my mind is to use the Kafka streaming source and to apply a single very large window to create a bounded table. Do you think that would work?
Are there other options available? Maybe converting a Stream to a bounded table is somehow possible? Thank you!

Kind Regards,
Niklas



On 13. Nov 2020, at 16:07, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Good to know that this solution may work for you. Regarding to the questions you raised, please find my reply inline.

Regards,
Dian

在 2020年11月13日,下午8:48,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thanks again for your response. In the meantime I tried out your proposal using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I am facing some issues, which I would like to address. If this goes too far, please let me know and I will open a new thread for each of the questions. Let me share some more information about my current environment, which will maybe help to answer the questions. I'm currently using my dev machine with Docker and one jobmanager container and one taskmanager container. If needed I can share the whole docker environment, but this would involve some more effort on my side. Here are my five questions.

1. Where can I find connector libraries for 1.12.0-rc1 or some kind of instruction how to build them? I can't find them in the 1.12.0-rc1 release and when I build flink from source, I can't find the connector libraries in the build target. I need flink-sql-connector-elasticsearch7 and flink-sql-connector-kafka.

You could download the connector jars of 1.12.0-rc1 from here: https://repository.apache.org/content/repositories/orgapacheflink-1402/org/apache/flink/

Thanks that worked like a charm!


2. Which steps are needed to properly Setup PyFlink? I followed the instructions, but I always get some ClassNotFoundExceptions for some Beam related classes in the taskmanager. The job still works fine, but this doesn't look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to resolve this by adding certain jars, but I wasn't able to fix it. Maybe you have an idea. You can find the Dockerfile attached, which lines out the steps I'm currently using. The Exceptions signature looks like this.
   Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1

Usually there is nothing specially need to do to set up PyFlink. I have manually checked that this class should be there(inside flink-python_2.11-1.12.0.jar) and so guess if it's because you environment isn't clean enough? 

I guess you could check the following things:
1) Is it because you have installed 1.11.2 before and so the environment is not clean enough? Could you uninstall PyFlink 1.11.2 manually and reinstall PyFlink 1.12.0-rc1 again? You could also manually check that there should be only one flink-python*.jar under directory xxx/site-packages/pyflink/opt/
2) Verify that the class is actually there by the following command: (flink-python_2.11-1.12.0.jar is under directory xxx/site-packages/pyflink/opt/)
jar tf flink-python_2.11-1.12.0.jar | grep "org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena"
3) If this exception still happens, could you share the exception stack?

I found one cause of this problem and it was mixing a Scala 2.12 Flink installation with PyFlink, which has some 2.11 jars in its opt folder. I think the JVM just skipped the class definitions, because they weren't compatible. I actually wasn't aware of the fact that PyFlink comes with prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it would make sense to point that out in the documentation. I think I never read that and it might be missing. Unfortunately there is still one Exception showing up at the very end of the job in the taskmanager log. I did the verification you asked for and the class is present in both jar files. The one which comes with Flink in the opt folder and the one of PyFlink. You can find the log attached.
I think the main question is which jar file has be loaded in in the three envronments (executor, jobmanager, taskmanager). Is it fine to not put the flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and taskmanager?


3. When increasing the size of the input data set I get the following Exception and the job is canceled. I tried to increase the resources assigned to flink, but it didn't help. Do you have an idea why this is happening? You can find a more detailed stack trace in apendix.

Could you check if there are any other exceptions in the log when this exception happens?

No I don't think that there are additional exceptions besides "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but maybe take a look in the attached log files. This problem could be related to 2., maybe the root cause is a class loading issue as well. What do you think? You can find attached three log files. One for the executor, the jobmanager and the taskmanager. Maybe you can find something useful.


4. I can't manage to get the SQL UNNEST operation to work. It is quite hard for me to debug it and I can't really find any valuable examples or documentation on the internet. Currently instead of creating an ARRAY I'm just returning a VARCHAR containing a string representation of the array. The relevant code you can find in the apendix.

There are some examples here: https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala

This was very helpful. I was able to implement it. There is only one detail missing. Is it possible to UNNEST an array of Rows or tuples? It would be really great if I would be able to return a list with multiple fields. Currently I'm just putting multiple value into a single VARCHAR, but that means the information needs to be extracted later on. Maybe you have an idea how to avoid that.


5. How can I obtain the output of the Python interpreter executing the UDF. If I put a print statement in the UDF I can't see the output in the log of the taskmanager. Is there a way to access it?

You can use the standard logging in Python UDF instead of print. The log output could then be found in the log of the task manager.

Thank you! That worked well. I should have checked that without asking.


I hope these aren't too many questions for this thread. If this is the case I can still split some of them out. Please let me know, if this is the case.
Thank you very much. I really appreciate your help.

It's fine to reuse this thread. :)

Kind Regards,
Niklas



End of the Taskmanager Log for 2.
###################################################################
taskmanager_1    | 2020-11-15 17:46:53,438 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:5, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: e5137050c0f1ef5e660311ddf1f3429f, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,440 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 541ad3e383fb9c024141f2bab5e8b7fd, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,442 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: ef8adb7d879f4072123fe4bc12054c0c, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,444 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:4, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: db5d62b8c9fe8172fc1883c148b150e8, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,846 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 637d053a0726548c2bc9261fc0e55414, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,849 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: cfaa8633b9102e3a509cfc94dd97d38f, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,851 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job ba4e3974860af7dc00a28fdfbb44fe06 from job leader monitoring.
taskmanager_1    | 2020-11-15 17:46:53,851 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job ba4e3974860af7dc00a28fdfbb44fe06.
taskmanager_1    | 2020-11-15 17:46:54,371 ERROR org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.rejectedExecution [] - Failed to submit a listener notification task. Event loop shut down?
taskmanager_1    | java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/util/concurrent/GlobalEventExecutor$2
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:227) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:215) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:498) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1089) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
taskmanager_1    | Caused by: java.lang.ClassNotFoundException: org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor$2
taskmanager_1    |      at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_275]
taskmanager_1    |      at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_275]
taskmanager_1    |      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_275]
taskmanager_1    |      ... 11 more
###################################################################
Logfiles for 3.

<large-data-set-taskmanager.log>
<large-data-set-jobmanager.log>
<large-data-set-executor.log>

Dockerfile for question 2.
####################################################################
# This image has been build based on the Dockerfile used for the flink image on docker hub.
# The only change I applied is that I switched to flink 1.12.0-rc1.
FROM flink:1.12.0-rc1-scala_2.12

# Install python
# TODO: Minimize dependencies
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    python3-dev \
    zip \
  && rm -rf /var/lib/apt/lists/* \
  && ln -s /usr/bin/python3 /usr/bin/python \
  && ln -s /usr/bin/pip3 /usr/bin/pip

# Install pyflink
  && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \
  && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
####################################################################
Stack Trace for question 3.
####################################################################
Caused by: java.lang.RuntimeException: Failed to close remote bundle
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366)
        ... 17 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
################################################################
Code for question 4.
################################################################
# UDAF signature
@udaf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
     result_type=DataTypes.VARCHAR(10000), func_type='pandas')
def forcast(ds_float_series, y):

# SQL DDL
"create table mySource (ds FLOAT, riid VARCHAR(100), y FLOAT ) with ( ... )"
"create table mySink (riid VARCHAR(100), yhatd VARCHAR(10000)) with ( ... )"

# SQL INSERT
"INSERT INTO mySink SELECT riid, forcast(ds, y) AS yhat FROM mySource GROUP BY riid"
################################################################

On 12. Nov 2020, at 12:53, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Python DataStream API will also be supported in coming release of 1.12.0 [1]. However, the functionalities are still limited for the time being compared to the Java DataStream API, e.g. it will only support the stateless operations, such as map, flat_map, etc.


在 2020年11月12日,下午7:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue.

Another question that came up to my mind is whether PyFlink supports any other API except Table and SQL, like the Streaming and Batch API. The documentation is only covering the Table API, but I'm not sure about that. Can you maybe tell me whether the Table and SQL API is the only one supported by PyFlink?

Kind Regards,
Niklas

 

On 11. Nov 2020, at 15:32, Dian Fu <[hidden email]> wrote:

Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same size and that Flink will split the input data into multiple bundles for Pandas UDF and the bundle size is non-determinstic. Both of the above two limitations are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of RC1[2] for 1.12.0 or build it yourself according to [3].

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Regards,
Dian

在 2020年11月11日,下午9:03,Niklas Wilcke <[hidden email]> 写道:

Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take.
2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#############################################################

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
    result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):

   # Train the model and create the forcast

   yhat_ts = forcast['yhat'].tail(input_size)
   return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
   .where("riid === 'r1i1'") \
   .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
   .insert_into('mySink')

t_env.execute("pandas_udf_demo")

#############################################################

Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API and UDF Limitations

Xingbo Huang
Hi Niklas,

Regarding `Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1`,
it does not affect the correctness of the result. The reason is that some resources are released asynchronously when Grpc Server is shut down[1] . After the UserClassLoader unloads the class, the asynchronous thread tries to release the resources and throw NotClassFoundException, but the content of the data result has been sent downstream, so the correctness of the result will not be affected.

Regarding the details of the specific causes, I have explained in the flink community[2] and the beam community[3], and fixed them in the flink community. There will be no such problem in the next version of release 1.11.3 and 1.12.0.

[1] https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/SharedResourceHolder.java#L150
[2] https://issues.apache.org/jira/browse/FLINK-20284
[3] https://issues.apache.org/jira/browse/BEAM-5397

Best,
Xingbo


Dian Fu <[hidden email]> 于2020年11月16日周一 下午9:10写道:
Hi Niklas,

How can I ingest data in a batch table from Kafka or even better Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch isn't offering a source at all.
The only workaround which comes to my mind is to use the Kafka streaming source and to apply a single very large window to create a bounded table. Do you think that would work?
Are there other options available? Maybe converting a Stream to a bounded table is somehow possible? Thank you!

I think you are right that Kafka still doesn't support batch and there is no ES source for now. Another option is you could load the data into a connector which supports batch. Not sure if anybody else has a better idea about this.

I found one cause of this problem and it was mixing a Scala 2.12 Flink installation with PyFlink, which has some 2.11 jars in its opt folder. I think the JVM just skipped the class definitions, because they weren't compatible. I actually wasn't aware of the fact that PyFlink comes with prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it would make sense to point that out in the documentation. I think I never read that and it might be missing. Unfortunately there is still one Exception showing up at the very end of the job in the taskmanager log. I did the verification you asked for and the class is present in both jar files. The one which comes with Flink in the opt folder and the one of PyFlink. You can find the log attached.
I think the main question is which jar file has be loaded in in the three envronments (executor, jobmanager, taskmanager). Is it fine to not put the flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and taskmanager?

PyFlink comes with the built-in jars such as flink-python_2.11-1.12.0.jar, flink-dist_2.11-1.12.0.jar, etc and so you don't need to manually add them(also shouldn't do that). Could you remove the duplicate jars and try it again?

No I don't think that there are additional exceptions besides "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but maybe take a look in the attached log files. This problem could be related to 2., maybe the root cause is a class loading issue as well. What do you think? You can find attached three log files. One for the executor, the jobmanager and the taskmanager. Maybe you can find something useful.

I found one similar issue at Beam side: https://issues.apache.org/jira/browse/BEAM-6258 which has been resolved long time ago. I'm still trying to reproduce this issue and will let you know if there is any progress. (Would be great if you could help to provide an example which could easily reproduce this issue)

This was very helpful. I was able to implement it. There is only one detail missing. Is it possible to UNNEST an array of Rows or tuples? It would be really great if I would be able to return a list with multiple fields. Currently I'm just putting multiple value into a single VARCHAR, but that means the information needs to be extracted later on. Maybe you have an idea how to avoid that.

Currently, Pandas UDAF still doesn't support complex type and so I'm afraid that you have to put multiple values into a single VARCHAR for now.

Regards,
Dian


在 2020年11月16日,上午2:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

this was very helpful again. To the old questions I will answer inline as well. Unfortunately also one new question popped up.

How can I ingest data in a batch table from Kafka or even better Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch isn't offering a source at all.
The only workaround which comes to my mind is to use the Kafka streaming source and to apply a single very large window to create a bounded table. Do you think that would work?
Are there other options available? Maybe converting a Stream to a bounded table is somehow possible? Thank you!

Kind Regards,
Niklas



On 13. Nov 2020, at 16:07, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Good to know that this solution may work for you. Regarding to the questions you raised, please find my reply inline.

Regards,
Dian

在 2020年11月13日,下午8:48,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thanks again for your response. In the meantime I tried out your proposal using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I am facing some issues, which I would like to address. If this goes too far, please let me know and I will open a new thread for each of the questions. Let me share some more information about my current environment, which will maybe help to answer the questions. I'm currently using my dev machine with Docker and one jobmanager container and one taskmanager container. If needed I can share the whole docker environment, but this would involve some more effort on my side. Here are my five questions.

1. Where can I find connector libraries for 1.12.0-rc1 or some kind of instruction how to build them? I can't find them in the 1.12.0-rc1 release and when I build flink from source, I can't find the connector libraries in the build target. I need flink-sql-connector-elasticsearch7 and flink-sql-connector-kafka.

You could download the connector jars of 1.12.0-rc1 from here: https://repository.apache.org/content/repositories/orgapacheflink-1402/org/apache/flink/

Thanks that worked like a charm!


2. Which steps are needed to properly Setup PyFlink? I followed the instructions, but I always get some ClassNotFoundExceptions for some Beam related classes in the taskmanager. The job still works fine, but this doesn't look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to resolve this by adding certain jars, but I wasn't able to fix it. Maybe you have an idea. You can find the Dockerfile attached, which lines out the steps I'm currently using. The Exceptions signature looks like this.
   Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1

Usually there is nothing specially need to do to set up PyFlink. I have manually checked that this class should be there(inside flink-python_2.11-1.12.0.jar) and so guess if it's because you environment isn't clean enough? 

I guess you could check the following things:
1) Is it because you have installed 1.11.2 before and so the environment is not clean enough? Could you uninstall PyFlink 1.11.2 manually and reinstall PyFlink 1.12.0-rc1 again? You could also manually check that there should be only one flink-python*.jar under directory xxx/site-packages/pyflink/opt/
2) Verify that the class is actually there by the following command: (flink-python_2.11-1.12.0.jar is under directory xxx/site-packages/pyflink/opt/)
jar tf flink-python_2.11-1.12.0.jar | grep "org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena"
3) If this exception still happens, could you share the exception stack?

I found one cause of this problem and it was mixing a Scala 2.12 Flink installation with PyFlink, which has some 2.11 jars in its opt folder. I think the JVM just skipped the class definitions, because they weren't compatible. I actually wasn't aware of the fact that PyFlink comes with prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it would make sense to point that out in the documentation. I think I never read that and it might be missing. Unfortunately there is still one Exception showing up at the very end of the job in the taskmanager log. I did the verification you asked for and the class is present in both jar files. The one which comes with Flink in the opt folder and the one of PyFlink. You can find the log attached.
I think the main question is which jar file has be loaded in in the three envronments (executor, jobmanager, taskmanager). Is it fine to not put the flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and taskmanager?


3. When increasing the size of the input data set I get the following Exception and the job is canceled. I tried to increase the resources assigned to flink, but it didn't help. Do you have an idea why this is happening? You can find a more detailed stack trace in apendix.

Could you check if there are any other exceptions in the log when this exception happens?

No I don't think that there are additional exceptions besides "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but maybe take a look in the attached log files. This problem could be related to 2., maybe the root cause is a class loading issue as well. What do you think? You can find attached three log files. One for the executor, the jobmanager and the taskmanager. Maybe you can find something useful.


4. I can't manage to get the SQL UNNEST operation to work. It is quite hard for me to debug it and I can't really find any valuable examples or documentation on the internet. Currently instead of creating an ARRAY I'm just returning a VARCHAR containing a string representation of the array. The relevant code you can find in the apendix.

There are some examples here: https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala

This was very helpful. I was able to implement it. There is only one detail missing. Is it possible to UNNEST an array of Rows or tuples? It would be really great if I would be able to return a list with multiple fields. Currently I'm just putting multiple value into a single VARCHAR, but that means the information needs to be extracted later on. Maybe you have an idea how to avoid that.


5. How can I obtain the output of the Python interpreter executing the UDF. If I put a print statement in the UDF I can't see the output in the log of the taskmanager. Is there a way to access it?

You can use the standard logging in Python UDF instead of print. The log output could then be found in the log of the task manager.

Thank you! That worked well. I should have checked that without asking.


I hope these aren't too many questions for this thread. If this is the case I can still split some of them out. Please let me know, if this is the case.
Thank you very much. I really appreciate your help.

It's fine to reuse this thread. :)

Kind Regards,
Niklas



End of the Taskmanager Log for 2.
###################################################################
taskmanager_1    | 2020-11-15 17:46:53,438 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:5, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: e5137050c0f1ef5e660311ddf1f3429f, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,440 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 541ad3e383fb9c024141f2bab5e8b7fd, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,442 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: ef8adb7d879f4072123fe4bc12054c0c, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,444 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:4, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: db5d62b8c9fe8172fc1883c148b150e8, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,846 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 637d053a0726548c2bc9261fc0e55414, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,849 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: cfaa8633b9102e3a509cfc94dd97d38f, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,851 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job ba4e3974860af7dc00a28fdfbb44fe06 from job leader monitoring.
taskmanager_1    | 2020-11-15 17:46:53,851 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job ba4e3974860af7dc00a28fdfbb44fe06.
taskmanager_1    | 2020-11-15 17:46:54,371 ERROR org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.rejectedExecution [] - Failed to submit a listener notification task. Event loop shut down?
taskmanager_1    | java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/util/concurrent/GlobalEventExecutor$2
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:227) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:215) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:498) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1089) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
taskmanager_1    | Caused by: java.lang.ClassNotFoundException: org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor$2
taskmanager_1    |      at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_275]
taskmanager_1    |      at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_275]
taskmanager_1    |      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_275]
taskmanager_1    |      ... 11 more
###################################################################
Logfiles for 3.

<large-data-set-taskmanager.log>
<large-data-set-jobmanager.log>
<large-data-set-executor.log>

Dockerfile for question 2.
####################################################################
# This image has been build based on the Dockerfile used for the flink image on docker hub.
# The only change I applied is that I switched to flink 1.12.0-rc1.
FROM flink:1.12.0-rc1-scala_2.12

# Install python
# TODO: Minimize dependencies
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    python3-dev \
    zip \
  && rm -rf /var/lib/apt/lists/* \
  && ln -s /usr/bin/python3 /usr/bin/python \
  && ln -s /usr/bin/pip3 /usr/bin/pip

# Install pyflink
  && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \
  && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
####################################################################
Stack Trace for question 3.
####################################################################
Caused by: java.lang.RuntimeException: Failed to close remote bundle
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366)
        ... 17 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
################################################################
Code for question 4.
################################################################
# UDAF signature
@udaf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
     result_type=DataTypes.VARCHAR(10000), func_type='pandas')
def forcast(ds_float_series, y):

# SQL DDL
"create table mySource (ds FLOAT, riid VARCHAR(100), y FLOAT ) with ( ... )"
"create table mySink (riid VARCHAR(100), yhatd VARCHAR(10000)) with ( ... )"

# SQL INSERT
"INSERT INTO mySink SELECT riid, forcast(ds, y) AS yhat FROM mySource GROUP BY riid"
################################################################

On 12. Nov 2020, at 12:53, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Python DataStream API will also be supported in coming release of 1.12.0 [1]. However, the functionalities are still limited for the time being compared to the Java DataStream API, e.g. it will only support the stateless operations, such as map, flat_map, etc.


在 2020年11月12日,下午7:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue.

Another question that came up to my mind is whether PyFlink supports any other API except Table and SQL, like the Streaming and Batch API. The documentation is only covering the Table API, but I'm not sure about that. Can you maybe tell me whether the Table and SQL API is the only one supported by PyFlink?

Kind Regards,
Niklas

 

On 11. Nov 2020, at 15:32, Dian Fu <[hidden email]> wrote:

Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same size and that Flink will split the input data into multiple bundles for Pandas UDF and the bundle size is non-determinstic. Both of the above two limitations are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of RC1[2] for 1.12.0 or build it yourself according to [3].

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Regards,
Dian

在 2020年11月11日,下午9:03,Niklas Wilcke <[hidden email]> 写道:

Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take.
2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#############################################################

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
    result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):

   # Train the model and create the forcast

   yhat_ts = forcast['yhat'].tail(input_size)
   return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
   .where("riid === 'r1i1'") \
   .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
   .insert_into('mySink')

t_env.execute("pandas_udf_demo")

#############################################################

Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API and UDF Limitations

Niklas Wilcke
Hi Xingbo,

thanks for taking care and letting me know. I was about to share an example, how to reproduce this.
Now I will wait for the next release candidate and give it a try.

Regards,
Niklas


--
[hidden email]
Mobile: +49 160 9793 2593
Office: +49 40 2380 6523

Simon-von-Utrecht-Straße 85a
20359 Hamburg

UNIBERG GmbH
Registergericht: Amtsgericht Kiel HRB SE-1507
Geschäftsführer: Andreas Möller, Martin Ulbricht

Information Art. 13 DSGVO B2B:
Für die Kommunikation mit Ihnen verarbeiten wir ggf. Ihre personenbezogenen Daten.
Alle Informationen zum Umgang mit Ihren Daten finden Sie unter https://www.uniberg.com/impressum.html. 

On 26. Nov 2020, at 02:59, Xingbo Huang <[hidden email]> wrote:

Hi Niklas,

Regarding `Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1`,
it does not affect the correctness of the result. The reason is that some resources are released asynchronously when Grpc Server is shut down[1] . After the UserClassLoader unloads the class, the asynchronous thread tries to release the resources and throw NotClassFoundException, but the content of the data result has been sent downstream, so the correctness of the result will not be affected.

Regarding the details of the specific causes, I have explained in the flink community[2] and the beam community[3], and fixed them in the flink community. There will be no such problem in the next version of release 1.11.3 and 1.12.0.

[1] https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/SharedResourceHolder.java#L150
[2] https://issues.apache.org/jira/browse/FLINK-20284
[3] https://issues.apache.org/jira/browse/BEAM-5397

Best,
Xingbo


Dian Fu <[hidden email]> 于2020年11月16日周一 下午9:10写道:
Hi Niklas,

How can I ingest data in a batch table from Kafka or even better Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch isn't offering a source at all.
The only workaround which comes to my mind is to use the Kafka streaming source and to apply a single very large window to create a bounded table. Do you think that would work?
Are there other options available? Maybe converting a Stream to a bounded table is somehow possible? Thank you!

I think you are right that Kafka still doesn't support batch and there is no ES source for now. Another option is you could load the data into a connector which supports batch. Not sure if anybody else has a better idea about this.

I found one cause of this problem and it was mixing a Scala 2.12 Flink installation with PyFlink, which has some 2.11 jars in its opt folder. I think the JVM just skipped the class definitions, because they weren't compatible. I actually wasn't aware of the fact that PyFlink comes with prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it would make sense to point that out in the documentation. I think I never read that and it might be missing. Unfortunately there is still one Exception showing up at the very end of the job in the taskmanager log. I did the verification you asked for and the class is present in both jar files. The one which comes with Flink in the opt folder and the one of PyFlink. You can find the log attached.
I think the main question is which jar file has be loaded in in the three envronments (executor, jobmanager, taskmanager). Is it fine to not put the flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and taskmanager?

PyFlink comes with the built-in jars such as flink-python_2.11-1.12.0.jar, flink-dist_2.11-1.12.0.jar, etc and so you don't need to manually add them(also shouldn't do that). Could you remove the duplicate jars and try it again?

No I don't think that there are additional exceptions besides "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but maybe take a look in the attached log files. This problem could be related to 2., maybe the root cause is a class loading issue as well. What do you think? You can find attached three log files. One for the executor, the jobmanager and the taskmanager. Maybe you can find something useful.

I found one similar issue at Beam side: https://issues.apache.org/jira/browse/BEAM-6258 which has been resolved long time ago. I'm still trying to reproduce this issue and will let you know if there is any progress. (Would be great if you could help to provide an example which could easily reproduce this issue)

This was very helpful. I was able to implement it. There is only one detail missing. Is it possible to UNNEST an array of Rows or tuples? It would be really great if I would be able to return a list with multiple fields. Currently I'm just putting multiple value into a single VARCHAR, but that means the information needs to be extracted later on. Maybe you have an idea how to avoid that.

Currently, Pandas UDAF still doesn't support complex type and so I'm afraid that you have to put multiple values into a single VARCHAR for now.

Regards,
Dian


在 2020年11月16日,上午2:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

this was very helpful again. To the old questions I will answer inline as well. Unfortunately also one new question popped up.

How can I ingest data in a batch table from Kafka or even better Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch isn't offering a source at all.
The only workaround which comes to my mind is to use the Kafka streaming source and to apply a single very large window to create a bounded table. Do you think that would work?
Are there other options available? Maybe converting a Stream to a bounded table is somehow possible? Thank you!

Kind Regards,
Niklas



On 13. Nov 2020, at 16:07, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Good to know that this solution may work for you. Regarding to the questions you raised, please find my reply inline.

Regards,
Dian

在 2020年11月13日,下午8:48,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thanks again for your response. In the meantime I tried out your proposal using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I am facing some issues, which I would like to address. If this goes too far, please let me know and I will open a new thread for each of the questions. Let me share some more information about my current environment, which will maybe help to answer the questions. I'm currently using my dev machine with Docker and one jobmanager container and one taskmanager container. If needed I can share the whole docker environment, but this would involve some more effort on my side. Here are my five questions.

1. Where can I find connector libraries for 1.12.0-rc1 or some kind of instruction how to build them? I can't find them in the 1.12.0-rc1 release and when I build flink from source, I can't find the connector libraries in the build target. I need flink-sql-connector-elasticsearch7 and flink-sql-connector-kafka.

You could download the connector jars of 1.12.0-rc1 from here: https://repository.apache.org/content/repositories/orgapacheflink-1402/org/apache/flink/

Thanks that worked like a charm!


2. Which steps are needed to properly Setup PyFlink? I followed the instructions, but I always get some ClassNotFoundExceptions for some Beam related classes in the taskmanager. The job still works fine, but this doesn't look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to resolve this by adding certain jars, but I wasn't able to fix it. Maybe you have an idea. You can find the Dockerfile attached, which lines out the steps I'm currently using. The Exceptions signature looks like this.
   Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1

Usually there is nothing specially need to do to set up PyFlink. I have manually checked that this class should be there(inside flink-python_2.11-1.12.0.jar) and so guess if it's because you environment isn't clean enough? 

I guess you could check the following things:
1) Is it because you have installed 1.11.2 before and so the environment is not clean enough? Could you uninstall PyFlink 1.11.2 manually and reinstall PyFlink 1.12.0-rc1 again? You could also manually check that there should be only one flink-python*.jar under directory xxx/site-packages/pyflink/opt/
2) Verify that the class is actually there by the following command: (flink-python_2.11-1.12.0.jar is under directory xxx/site-packages/pyflink/opt/)
jar tf flink-python_2.11-1.12.0.jar | grep "org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena"
3) If this exception still happens, could you share the exception stack?

I found one cause of this problem and it was mixing a Scala 2.12 Flink installation with PyFlink, which has some 2.11 jars in its opt folder. I think the JVM just skipped the class definitions, because they weren't compatible. I actually wasn't aware of the fact that PyFlink comes with prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it would make sense to point that out in the documentation. I think I never read that and it might be missing. Unfortunately there is still one Exception showing up at the very end of the job in the taskmanager log. I did the verification you asked for and the class is present in both jar files. The one which comes with Flink in the opt folder and the one of PyFlink. You can find the log attached.
I think the main question is which jar file has be loaded in in the three envronments (executor, jobmanager, taskmanager). Is it fine to not put the flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and taskmanager?


3. When increasing the size of the input data set I get the following Exception and the job is canceled. I tried to increase the resources assigned to flink, but it didn't help. Do you have an idea why this is happening? You can find a more detailed stack trace in apendix.

Could you check if there are any other exceptions in the log when this exception happens?

No I don't think that there are additional exceptions besides "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but maybe take a look in the attached log files. This problem could be related to 2., maybe the root cause is a class loading issue as well. What do you think? You can find attached three log files. One for the executor, the jobmanager and the taskmanager. Maybe you can find something useful.


4. I can't manage to get the SQL UNNEST operation to work. It is quite hard for me to debug it and I can't really find any valuable examples or documentation on the internet. Currently instead of creating an ARRAY I'm just returning a VARCHAR containing a string representation of the array. The relevant code you can find in the apendix.

There are some examples here: https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala

This was very helpful. I was able to implement it. There is only one detail missing. Is it possible to UNNEST an array of Rows or tuples? It would be really great if I would be able to return a list with multiple fields. Currently I'm just putting multiple value into a single VARCHAR, but that means the information needs to be extracted later on. Maybe you have an idea how to avoid that.


5. How can I obtain the output of the Python interpreter executing the UDF. If I put a print statement in the UDF I can't see the output in the log of the taskmanager. Is there a way to access it?

You can use the standard logging in Python UDF instead of print. The log output could then be found in the log of the task manager.

Thank you! That worked well. I should have checked that without asking.


I hope these aren't too many questions for this thread. If this is the case I can still split some of them out. Please let me know, if this is the case.
Thank you very much. I really appreciate your help.

It's fine to reuse this thread. :)

Kind Regards,
Niklas



End of the Taskmanager Log for 2.
###################################################################
taskmanager_1    | 2020-11-15 17:46:53,438 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:5, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: e5137050c0f1ef5e660311ddf1f3429f, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,440 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 541ad3e383fb9c024141f2bab5e8b7fd, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,442 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: ef8adb7d879f4072123fe4bc12054c0c, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,444 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:4, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: db5d62b8c9fe8172fc1883c148b150e8, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,846 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 637d053a0726548c2bc9261fc0e55414, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,849 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: cfaa8633b9102e3a509cfc94dd97d38f, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,851 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job ba4e3974860af7dc00a28fdfbb44fe06 from job leader monitoring.
taskmanager_1    | 2020-11-15 17:46:53,851 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job ba4e3974860af7dc00a28fdfbb44fe06.
taskmanager_1    | 2020-11-15 17:46:54,371 ERROR org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.rejectedExecution [] - Failed to submit a listener notification task. Event loop shut down?
taskmanager_1    | java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/util/concurrent/GlobalEventExecutor$2
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:227) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:215) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:498) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1089) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
taskmanager_1    | Caused by: java.lang.ClassNotFoundException: org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor$2
taskmanager_1    |      at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_275]
taskmanager_1    |      at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_275]
taskmanager_1    |      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_275]
taskmanager_1    |      ... 11 more
###################################################################
Logfiles for 3.

<large-data-set-taskmanager.log>
<large-data-set-jobmanager.log>
<large-data-set-executor.log>

Dockerfile for question 2.
####################################################################
# This image has been build based on the Dockerfile used for the flink image on docker hub.
# The only change I applied is that I switched to flink 1.12.0-rc1.
FROM flink:1.12.0-rc1-scala_2.12

# Install python
# TODO: Minimize dependencies
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    python3-dev \
    zip \
  && rm -rf /var/lib/apt/lists/* \
  && ln -s /usr/bin/python3 /usr/bin/python \
  && ln -s /usr/bin/pip3 /usr/bin/pip

# Install pyflink
  && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \
  && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
####################################################################
Stack Trace for question 3.
####################################################################
Caused by: java.lang.RuntimeException: Failed to close remote bundle
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366)
        ... 17 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
################################################################
Code for question 4.
################################################################
# UDAF signature
@udaf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
     result_type=DataTypes.VARCHAR(10000), func_type='pandas')
def forcast(ds_float_series, y):

# SQL DDL
"create table mySource (ds FLOAT, riid VARCHAR(100), y FLOAT ) with ( ... )"
"create table mySink (riid VARCHAR(100), yhatd VARCHAR(10000)) with ( ... )"

# SQL INSERT
"INSERT INTO mySink SELECT riid, forcast(ds, y) AS yhat FROM mySource GROUP BY riid"
################################################################

On 12. Nov 2020, at 12:53, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Python DataStream API will also be supported in coming release of 1.12.0 [1]. However, the functionalities are still limited for the time being compared to the Java DataStream API, e.g. it will only support the stateless operations, such as map, flat_map, etc.


在 2020年11月12日,下午7:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue.

Another question that came up to my mind is whether PyFlink supports any other API except Table and SQL, like the Streaming and Batch API. The documentation is only covering the Table API, but I'm not sure about that. Can you maybe tell me whether the Table and SQL API is the only one supported by PyFlink?

Kind Regards,
Niklas

 

On 11. Nov 2020, at 15:32, Dian Fu <[hidden email]> wrote:

Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same size and that Flink will split the input data into multiple bundles for Pandas UDF and the bundle size is non-determinstic. Both of the above two limitations are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of RC1[2] for 1.12.0 or build it yourself according to [3].

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Regards,
Dian

在 2020年11月11日,下午9:03,Niklas Wilcke <[hidden email]> 写道:

Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take.
2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#############################################################

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
    result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):

   # Train the model and create the forcast

   yhat_ts = forcast['yhat'].tail(input_size)
   return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
   .where("riid === 'r1i1'") \
   .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
   .insert_into('mySink')

t_env.execute("pandas_udf_demo")

#############################################################



smime.p7s (4K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API and UDF Limitations

Xingbo Huang
Hi Niklas,

Thanks a lot for supporting PyFlink. In fact, your requirement for multiple input and multiple output is essentially Table Aggregation Functions[1]. Although PyFlink does not support it yet, we have listed it in the release 1.13 plan. In addition, row-based operations[2] that are very user-friendly to machine learning users are also included in the 1.13 plan.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#table-aggregation-functions
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations

Best,
Xingbo

Niklas Wilcke <[hidden email]> 于2020年11月26日周四 下午5:11写道:
Hi Xingbo,

thanks for taking care and letting me know. I was about to share an example, how to reproduce this.
Now I will wait for the next release candidate and give it a try.

Regards,
Niklas


--
[hidden email]
Mobile: +49 160 9793 2593
Office: +49 40 2380 6523

Simon-von-Utrecht-Straße 85a
20359 Hamburg

UNIBERG GmbH
Registergericht: Amtsgericht Kiel HRB SE-1507
Geschäftsführer: Andreas Möller, Martin Ulbricht

Information Art. 13 DSGVO B2B:
Für die Kommunikation mit Ihnen verarbeiten wir ggf. Ihre personenbezogenen Daten.
Alle Informationen zum Umgang mit Ihren Daten finden Sie unter https://www.uniberg.com/impressum.html

On 26. Nov 2020, at 02:59, Xingbo Huang <[hidden email]> wrote:

Hi Niklas,

Regarding `Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1`,
it does not affect the correctness of the result. The reason is that some resources are released asynchronously when Grpc Server is shut down[1] . After the UserClassLoader unloads the class, the asynchronous thread tries to release the resources and throw NotClassFoundException, but the content of the data result has been sent downstream, so the correctness of the result will not be affected.

Regarding the details of the specific causes, I have explained in the flink community[2] and the beam community[3], and fixed them in the flink community. There will be no such problem in the next version of release 1.11.3 and 1.12.0.

[1] https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/SharedResourceHolder.java#L150
[2] https://issues.apache.org/jira/browse/FLINK-20284
[3] https://issues.apache.org/jira/browse/BEAM-5397

Best,
Xingbo


Dian Fu <[hidden email]> 于2020年11月16日周一 下午9:10写道:
Hi Niklas,

How can I ingest data in a batch table from Kafka or even better Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch isn't offering a source at all.
The only workaround which comes to my mind is to use the Kafka streaming source and to apply a single very large window to create a bounded table. Do you think that would work?
Are there other options available? Maybe converting a Stream to a bounded table is somehow possible? Thank you!

I think you are right that Kafka still doesn't support batch and there is no ES source for now. Another option is you could load the data into a connector which supports batch. Not sure if anybody else has a better idea about this.

I found one cause of this problem and it was mixing a Scala 2.12 Flink installation with PyFlink, which has some 2.11 jars in its opt folder. I think the JVM just skipped the class definitions, because they weren't compatible. I actually wasn't aware of the fact that PyFlink comes with prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it would make sense to point that out in the documentation. I think I never read that and it might be missing. Unfortunately there is still one Exception showing up at the very end of the job in the taskmanager log. I did the verification you asked for and the class is present in both jar files. The one which comes with Flink in the opt folder and the one of PyFlink. You can find the log attached.
I think the main question is which jar file has be loaded in in the three envronments (executor, jobmanager, taskmanager). Is it fine to not put the flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and taskmanager?

PyFlink comes with the built-in jars such as flink-python_2.11-1.12.0.jar, flink-dist_2.11-1.12.0.jar, etc and so you don't need to manually add them(also shouldn't do that). Could you remove the duplicate jars and try it again?

No I don't think that there are additional exceptions besides "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but maybe take a look in the attached log files. This problem could be related to 2., maybe the root cause is a class loading issue as well. What do you think? You can find attached three log files. One for the executor, the jobmanager and the taskmanager. Maybe you can find something useful.

I found one similar issue at Beam side: https://issues.apache.org/jira/browse/BEAM-6258 which has been resolved long time ago. I'm still trying to reproduce this issue and will let you know if there is any progress. (Would be great if you could help to provide an example which could easily reproduce this issue)

This was very helpful. I was able to implement it. There is only one detail missing. Is it possible to UNNEST an array of Rows or tuples? It would be really great if I would be able to return a list with multiple fields. Currently I'm just putting multiple value into a single VARCHAR, but that means the information needs to be extracted later on. Maybe you have an idea how to avoid that.

Currently, Pandas UDAF still doesn't support complex type and so I'm afraid that you have to put multiple values into a single VARCHAR for now.

Regards,
Dian


在 2020年11月16日,上午2:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

this was very helpful again. To the old questions I will answer inline as well. Unfortunately also one new question popped up.

How can I ingest data in a batch table from Kafka or even better Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch isn't offering a source at all.
The only workaround which comes to my mind is to use the Kafka streaming source and to apply a single very large window to create a bounded table. Do you think that would work?
Are there other options available? Maybe converting a Stream to a bounded table is somehow possible? Thank you!

Kind Regards,
Niklas



On 13. Nov 2020, at 16:07, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Good to know that this solution may work for you. Regarding to the questions you raised, please find my reply inline.

Regards,
Dian

在 2020年11月13日,下午8:48,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thanks again for your response. In the meantime I tried out your proposal using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I am facing some issues, which I would like to address. If this goes too far, please let me know and I will open a new thread for each of the questions. Let me share some more information about my current environment, which will maybe help to answer the questions. I'm currently using my dev machine with Docker and one jobmanager container and one taskmanager container. If needed I can share the whole docker environment, but this would involve some more effort on my side. Here are my five questions.

1. Where can I find connector libraries for 1.12.0-rc1 or some kind of instruction how to build them? I can't find them in the 1.12.0-rc1 release and when I build flink from source, I can't find the connector libraries in the build target. I need flink-sql-connector-elasticsearch7 and flink-sql-connector-kafka.

You could download the connector jars of 1.12.0-rc1 from here: https://repository.apache.org/content/repositories/orgapacheflink-1402/org/apache/flink/

Thanks that worked like a charm!


2. Which steps are needed to properly Setup PyFlink? I followed the instructions, but I always get some ClassNotFoundExceptions for some Beam related classes in the taskmanager. The job still works fine, but this doesn't look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to resolve this by adding certain jars, but I wasn't able to fix it. Maybe you have an idea. You can find the Dockerfile attached, which lines out the steps I'm currently using. The Exceptions signature looks like this.
   Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1

Usually there is nothing specially need to do to set up PyFlink. I have manually checked that this class should be there(inside flink-python_2.11-1.12.0.jar) and so guess if it's because you environment isn't clean enough? 

I guess you could check the following things:
1) Is it because you have installed 1.11.2 before and so the environment is not clean enough? Could you uninstall PyFlink 1.11.2 manually and reinstall PyFlink 1.12.0-rc1 again? You could also manually check that there should be only one flink-python*.jar under directory xxx/site-packages/pyflink/opt/
2) Verify that the class is actually there by the following command: (flink-python_2.11-1.12.0.jar is under directory xxx/site-packages/pyflink/opt/)
jar tf flink-python_2.11-1.12.0.jar | grep "org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena"
3) If this exception still happens, could you share the exception stack?

I found one cause of this problem and it was mixing a Scala 2.12 Flink installation with PyFlink, which has some 2.11 jars in its opt folder. I think the JVM just skipped the class definitions, because they weren't compatible. I actually wasn't aware of the fact that PyFlink comes with prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it would make sense to point that out in the documentation. I think I never read that and it might be missing. Unfortunately there is still one Exception showing up at the very end of the job in the taskmanager log. I did the verification you asked for and the class is present in both jar files. The one which comes with Flink in the opt folder and the one of PyFlink. You can find the log attached.
I think the main question is which jar file has be loaded in in the three envronments (executor, jobmanager, taskmanager). Is it fine to not put the flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and taskmanager?


3. When increasing the size of the input data set I get the following Exception and the job is canceled. I tried to increase the resources assigned to flink, but it didn't help. Do you have an idea why this is happening? You can find a more detailed stack trace in apendix.

Could you check if there are any other exceptions in the log when this exception happens?

No I don't think that there are additional exceptions besides "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but maybe take a look in the attached log files. This problem could be related to 2., maybe the root cause is a class loading issue as well. What do you think? You can find attached three log files. One for the executor, the jobmanager and the taskmanager. Maybe you can find something useful.


4. I can't manage to get the SQL UNNEST operation to work. It is quite hard for me to debug it and I can't really find any valuable examples or documentation on the internet. Currently instead of creating an ARRAY I'm just returning a VARCHAR containing a string representation of the array. The relevant code you can find in the apendix.

There are some examples here: https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala

This was very helpful. I was able to implement it. There is only one detail missing. Is it possible to UNNEST an array of Rows or tuples? It would be really great if I would be able to return a list with multiple fields. Currently I'm just putting multiple value into a single VARCHAR, but that means the information needs to be extracted later on. Maybe you have an idea how to avoid that.


5. How can I obtain the output of the Python interpreter executing the UDF. If I put a print statement in the UDF I can't see the output in the log of the taskmanager. Is there a way to access it?

You can use the standard logging in Python UDF instead of print. The log output could then be found in the log of the task manager.

Thank you! That worked well. I should have checked that without asking.


I hope these aren't too many questions for this thread. If this is the case I can still split some of them out. Please let me know, if this is the case.
Thank you very much. I really appreciate your help.

It's fine to reuse this thread. :)

Kind Regards,
Niklas



End of the Taskmanager Log for 2.
###################################################################
taskmanager_1    | 2020-11-15 17:46:53,438 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:5, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: e5137050c0f1ef5e660311ddf1f3429f, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,440 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 541ad3e383fb9c024141f2bab5e8b7fd, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,442 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: ef8adb7d879f4072123fe4bc12054c0c, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,444 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:4, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: db5d62b8c9fe8172fc1883c148b150e8, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,846 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 637d053a0726548c2bc9261fc0e55414, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,849 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: cfaa8633b9102e3a509cfc94dd97d38f, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,851 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job ba4e3974860af7dc00a28fdfbb44fe06 from job leader monitoring.
taskmanager_1    | 2020-11-15 17:46:53,851 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job ba4e3974860af7dc00a28fdfbb44fe06.
taskmanager_1    | 2020-11-15 17:46:54,371 ERROR org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.rejectedExecution [] - Failed to submit a listener notification task. Event loop shut down?
taskmanager_1    | java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/util/concurrent/GlobalEventExecutor$2
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:227) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:215) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:498) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1089) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
taskmanager_1    | Caused by: java.lang.ClassNotFoundException: org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor$2
taskmanager_1    |      at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_275]
taskmanager_1    |      at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_275]
taskmanager_1    |      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_275]
taskmanager_1    |      ... 11 more
###################################################################
Logfiles for 3.

<large-data-set-taskmanager.log>
<large-data-set-jobmanager.log>
<large-data-set-executor.log>

Dockerfile for question 2.
####################################################################
# This image has been build based on the Dockerfile used for the flink image on docker hub.
# The only change I applied is that I switched to flink 1.12.0-rc1.
FROM flink:1.12.0-rc1-scala_2.12

# Install python
# TODO: Minimize dependencies
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    python3-dev \
    zip \
  && rm -rf /var/lib/apt/lists/* \
  && ln -s /usr/bin/python3 /usr/bin/python \
  && ln -s /usr/bin/pip3 /usr/bin/pip

# Install pyflink
  && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \
  && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
####################################################################
Stack Trace for question 3.
####################################################################
Caused by: java.lang.RuntimeException: Failed to close remote bundle
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366)
        ... 17 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
################################################################
Code for question 4.
################################################################
# UDAF signature
@udaf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
     result_type=DataTypes.VARCHAR(10000), func_type='pandas')
def forcast(ds_float_series, y):

# SQL DDL
"create table mySource (ds FLOAT, riid VARCHAR(100), y FLOAT ) with ( ... )"
"create table mySink (riid VARCHAR(100), yhatd VARCHAR(10000)) with ( ... )"

# SQL INSERT
"INSERT INTO mySink SELECT riid, forcast(ds, y) AS yhat FROM mySource GROUP BY riid"
################################################################

On 12. Nov 2020, at 12:53, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Python DataStream API will also be supported in coming release of 1.12.0 [1]. However, the functionalities are still limited for the time being compared to the Java DataStream API, e.g. it will only support the stateless operations, such as map, flat_map, etc.


在 2020年11月12日,下午7:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue.

Another question that came up to my mind is whether PyFlink supports any other API except Table and SQL, like the Streaming and Batch API. The documentation is only covering the Table API, but I'm not sure about that. Can you maybe tell me whether the Table and SQL API is the only one supported by PyFlink?

Kind Regards,
Niklas

 

On 11. Nov 2020, at 15:32, Dian Fu <[hidden email]> wrote:

Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same size and that Flink will split the input data into multiple bundles for Pandas UDF and the bundle size is non-determinstic. Both of the above two limitations are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of RC1[2] for 1.12.0 or build it yourself according to [3].

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Regards,
Dian

在 2020年11月11日,下午9:03,Niklas Wilcke <[hidden email]> 写道:

Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take.
2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#############################################################

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
    result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):

   # Train the model and create the forcast

   yhat_ts = forcast['yhat'].tail(input_size)
   return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
   .where("riid === 'r1i1'") \
   .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
   .insert_into('mySink')

t_env.execute("pandas_udf_demo")

#############################################################


Reply | Threaded
Open this post in threaded view
|

Re: PyFlink Table API and UDF Limitations

Niklas Wilcke
Hi Xingbo,

thanks for sharing. This is very interesting.

Regards,
Niklas

On 27. Nov 2020, at 03:05, Xingbo Huang <[hidden email]> wrote:

Hi Niklas,

Thanks a lot for supporting PyFlink. In fact, your requirement for multiple input and multiple output is essentially Table Aggregation Functions[1]. Although PyFlink does not support it yet, we have listed it in the release 1.13 plan. In addition, row-based operations[2] that are very user-friendly to machine learning users are also included in the 1.13 plan.

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#table-aggregation-functions
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations

Best,
Xingbo

Niklas Wilcke <[hidden email]> 于2020年11月26日周四 下午5:11写道:
Hi Xingbo,

thanks for taking care and letting me know. I was about to share an example, how to reproduce this.
Now I will wait for the next release candidate and give it a try.

Regards,
Niklas


--
[hidden email]
Mobile: +49 160 9793 2593
Office: +49 40 2380 6523

Simon-von-Utrecht-Straße 85a
20359 Hamburg

UNIBERG GmbH
Registergericht: Amtsgericht Kiel HRB SE-1507
Geschäftsführer: Andreas Möller, Martin Ulbricht

Information Art. 13 DSGVO B2B:
Für die Kommunikation mit Ihnen verarbeiten wir ggf. Ihre personenbezogenen Daten.
Alle Informationen zum Umgang mit Ihren Daten finden Sie unter https://www.uniberg.com/impressum.html

On 26. Nov 2020, at 02:59, Xingbo Huang <[hidden email]> wrote:

Hi Niklas,

Regarding `Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1`,
it does not affect the correctness of the result. The reason is that some resources are released asynchronously when Grpc Server is shut down[1] . After the UserClassLoader unloads the class, the asynchronous thread tries to release the resources and throw NotClassFoundException, but the content of the data result has been sent downstream, so the correctness of the result will not be affected.

Regarding the details of the specific causes, I have explained in the flink community[2] and the beam community[3], and fixed them in the flink community. There will be no such problem in the next version of release 1.11.3 and 1.12.0.

[1] https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/SharedResourceHolder.java#L150
[2] https://issues.apache.org/jira/browse/FLINK-20284
[3] https://issues.apache.org/jira/browse/BEAM-5397

Best,
Xingbo


Dian Fu <[hidden email]> 于2020年11月16日周一 下午9:10写道:
Hi Niklas,

How can I ingest data in a batch table from Kafka or even better Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch isn't offering a source at all.
The only workaround which comes to my mind is to use the Kafka streaming source and to apply a single very large window to create a bounded table. Do you think that would work?
Are there other options available? Maybe converting a Stream to a bounded table is somehow possible? Thank you!

I think you are right that Kafka still doesn't support batch and there is no ES source for now. Another option is you could load the data into a connector which supports batch. Not sure if anybody else has a better idea about this.

I found one cause of this problem and it was mixing a Scala 2.12 Flink installation with PyFlink, which has some 2.11 jars in its opt folder. I think the JVM just skipped the class definitions, because they weren't compatible. I actually wasn't aware of the fact that PyFlink comes with prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it would make sense to point that out in the documentation. I think I never read that and it might be missing. Unfortunately there is still one Exception showing up at the very end of the job in the taskmanager log. I did the verification you asked for and the class is present in both jar files. The one which comes with Flink in the opt folder and the one of PyFlink. You can find the log attached.
I think the main question is which jar file has be loaded in in the three envronments (executor, jobmanager, taskmanager). Is it fine to not put the flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and taskmanager?

PyFlink comes with the built-in jars such as flink-python_2.11-1.12.0.jar, flink-dist_2.11-1.12.0.jar, etc and so you don't need to manually add them(also shouldn't do that). Could you remove the duplicate jars and try it again?

No I don't think that there are additional exceptions besides "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but maybe take a look in the attached log files. This problem could be related to 2., maybe the root cause is a class loading issue as well. What do you think? You can find attached three log files. One for the executor, the jobmanager and the taskmanager. Maybe you can find something useful.

I found one similar issue at Beam side: https://issues.apache.org/jira/browse/BEAM-6258 which has been resolved long time ago. I'm still trying to reproduce this issue and will let you know if there is any progress. (Would be great if you could help to provide an example which could easily reproduce this issue)

This was very helpful. I was able to implement it. There is only one detail missing. Is it possible to UNNEST an array of Rows or tuples? It would be really great if I would be able to return a list with multiple fields. Currently I'm just putting multiple value into a single VARCHAR, but that means the information needs to be extracted later on. Maybe you have an idea how to avoid that.

Currently, Pandas UDAF still doesn't support complex type and so I'm afraid that you have to put multiple values into a single VARCHAR for now.

Regards,
Dian


在 2020年11月16日,上午2:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

this was very helpful again. To the old questions I will answer inline as well. Unfortunately also one new question popped up.

How can I ingest data in a batch table from Kafka or even better Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch isn't offering a source at all.
The only workaround which comes to my mind is to use the Kafka streaming source and to apply a single very large window to create a bounded table. Do you think that would work?
Are there other options available? Maybe converting a Stream to a bounded table is somehow possible? Thank you!

Kind Regards,
Niklas



On 13. Nov 2020, at 16:07, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Good to know that this solution may work for you. Regarding to the questions you raised, please find my reply inline.

Regards,
Dian

在 2020年11月13日,下午8:48,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thanks again for your response. In the meantime I tried out your proposal using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I am facing some issues, which I would like to address. If this goes too far, please let me know and I will open a new thread for each of the questions. Let me share some more information about my current environment, which will maybe help to answer the questions. I'm currently using my dev machine with Docker and one jobmanager container and one taskmanager container. If needed I can share the whole docker environment, but this would involve some more effort on my side. Here are my five questions.

1. Where can I find connector libraries for 1.12.0-rc1 or some kind of instruction how to build them? I can't find them in the 1.12.0-rc1 release and when I build flink from source, I can't find the connector libraries in the build target. I need flink-sql-connector-elasticsearch7 and flink-sql-connector-kafka.

You could download the connector jars of 1.12.0-rc1 from here: https://repository.apache.org/content/repositories/orgapacheflink-1402/org/apache/flink/

Thanks that worked like a charm!


2. Which steps are needed to properly Setup PyFlink? I followed the instructions, but I always get some ClassNotFoundExceptions for some Beam related classes in the taskmanager. The job still works fine, but this doesn't look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to resolve this by adding certain jars, but I wasn't able to fix it. Maybe you have an idea. You can find the Dockerfile attached, which lines out the steps I'm currently using. The Exceptions signature looks like this.
   Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1

Usually there is nothing specially need to do to set up PyFlink. I have manually checked that this class should be there(inside flink-python_2.11-1.12.0.jar) and so guess if it's because you environment isn't clean enough? 

I guess you could check the following things:
1) Is it because you have installed 1.11.2 before and so the environment is not clean enough? Could you uninstall PyFlink 1.11.2 manually and reinstall PyFlink 1.12.0-rc1 again? You could also manually check that there should be only one flink-python*.jar under directory xxx/site-packages/pyflink/opt/
2) Verify that the class is actually there by the following command: (flink-python_2.11-1.12.0.jar is under directory xxx/site-packages/pyflink/opt/)
jar tf flink-python_2.11-1.12.0.jar | grep "org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena"
3) If this exception still happens, could you share the exception stack?

I found one cause of this problem and it was mixing a Scala 2.12 Flink installation with PyFlink, which has some 2.11 jars in its opt folder. I think the JVM just skipped the class definitions, because they weren't compatible. I actually wasn't aware of the fact that PyFlink comes with prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it would make sense to point that out in the documentation. I think I never read that and it might be missing. Unfortunately there is still one Exception showing up at the very end of the job in the taskmanager log. I did the verification you asked for and the class is present in both jar files. The one which comes with Flink in the opt folder and the one of PyFlink. You can find the log attached.
I think the main question is which jar file has be loaded in in the three envronments (executor, jobmanager, taskmanager). Is it fine to not put the flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and taskmanager?


3. When increasing the size of the input data set I get the following Exception and the job is canceled. I tried to increase the resources assigned to flink, but it didn't help. Do you have an idea why this is happening? You can find a more detailed stack trace in apendix.

Could you check if there are any other exceptions in the log when this exception happens?

No I don't think that there are additional exceptions besides "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but maybe take a look in the attached log files. This problem could be related to 2., maybe the root cause is a class loading issue as well. What do you think? You can find attached three log files. One for the executor, the jobmanager and the taskmanager. Maybe you can find something useful.


4. I can't manage to get the SQL UNNEST operation to work. It is quite hard for me to debug it and I can't really find any valuable examples or documentation on the internet. Currently instead of creating an ARRAY I'm just returning a VARCHAR containing a string representation of the array. The relevant code you can find in the apendix.

There are some examples here: https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnnestITCase.scala

This was very helpful. I was able to implement it. There is only one detail missing. Is it possible to UNNEST an array of Rows or tuples? It would be really great if I would be able to return a list with multiple fields. Currently I'm just putting multiple value into a single VARCHAR, but that means the information needs to be extracted later on. Maybe you have an idea how to avoid that.


5. How can I obtain the output of the Python interpreter executing the UDF. If I put a print statement in the UDF I can't see the output in the log of the taskmanager. Is there a way to access it?

You can use the standard logging in Python UDF instead of print. The log output could then be found in the log of the task manager.

Thank you! That worked well. I should have checked that without asking.


I hope these aren't too many questions for this thread. If this is the case I can still split some of them out. Please let me know, if this is the case.
Thank you very much. I really appreciate your help.

It's fine to reuse this thread. :)

Kind Regards,
Niklas



End of the Taskmanager Log for 2.
###################################################################
taskmanager_1    | 2020-11-15 17:46:53,438 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:5, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: e5137050c0f1ef5e660311ddf1f3429f, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,440 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 541ad3e383fb9c024141f2bab5e8b7fd, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,442 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: ef8adb7d879f4072123fe4bc12054c0c, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,444 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:4, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: db5d62b8c9fe8172fc1883c148b150e8, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,846 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 637d053a0726548c2bc9261fc0e55414, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,849 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: cfaa8633b9102e3a509cfc94dd97d38f, jobId: ba4e3974860af7dc00a28fdfbb44fe06).
taskmanager_1    | 2020-11-15 17:46:53,851 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job ba4e3974860af7dc00a28fdfbb44fe06 from job leader monitoring.
taskmanager_1    | 2020-11-15 17:46:53,851 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job ba4e3974860af7dc00a28fdfbb44fe06.
taskmanager_1    | 2020-11-15 17:46:54,371 ERROR org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.rejectedExecution [] - Failed to submit a listener notification task. Event loop shut down?
taskmanager_1    | java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/util/concurrent/GlobalEventExecutor$2
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:227) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:215) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:498) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1089) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0]
taskmanager_1    |      at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
taskmanager_1    | Caused by: java.lang.ClassNotFoundException: org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor$2
taskmanager_1    |      at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_275]
taskmanager_1    |      at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_275]
taskmanager_1    |      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
taskmanager_1    |      at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_275]
taskmanager_1    |      ... 11 more
###################################################################
Logfiles for 3.

<large-data-set-taskmanager.log>
<large-data-set-jobmanager.log>
<large-data-set-executor.log>

Dockerfile for question 2.
####################################################################
# This image has been build based on the Dockerfile used for the flink image on docker hub.
# The only change I applied is that I switched to flink 1.12.0-rc1.
FROM flink:1.12.0-rc1-scala_2.12

# Install python
# TODO: Minimize dependencies
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    python3-dev \
    zip \
  && rm -rf /var/lib/apt/lists/* \
  && ln -s /usr/bin/python3 /usr/bin/python \
  && ln -s /usr/bin/pip3 /usr/bin/pip

# Install pyflink
  && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \
  && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl
####################################################################
Stack Trace for question 3.
####################################################################
Caused by: java.lang.RuntimeException: Failed to close remote bundle
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283)
        at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94)
        at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
        at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366)
        ... 17 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
################################################################
Code for question 4.
################################################################
# UDAF signature
@udaf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
     result_type=DataTypes.VARCHAR(10000), func_type='pandas')
def forcast(ds_float_series, y):

# SQL DDL
"create table mySource (ds FLOAT, riid VARCHAR(100), y FLOAT ) with ( ... )"
"create table mySink (riid VARCHAR(100), yhatd VARCHAR(10000)) with ( ... )"

# SQL INSERT
"INSERT INTO mySink SELECT riid, forcast(ds, y) AS yhat FROM mySource GROUP BY riid"
################################################################

On 12. Nov 2020, at 12:53, Dian Fu <[hidden email]> wrote:

Hi Niklas,

Python DataStream API will also be supported in coming release of 1.12.0 [1]. However, the functionalities are still limited for the time being compared to the Java DataStream API, e.g. it will only support the stateless operations, such as map, flat_map, etc.


在 2020年11月12日,下午7:46,Niklas Wilcke <[hidden email]> 写道:

Hi Dian,

thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue.

Another question that came up to my mind is whether PyFlink supports any other API except Table and SQL, like the Streaming and Batch API. The documentation is only covering the Table API, but I'm not sure about that. Can you maybe tell me whether the Table and SQL API is the only one supported by PyFlink?

Kind Regards,
Niklas

 

On 11. Nov 2020, at 15:32, Dian Fu <[hidden email]> wrote:

Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same size and that Flink will split the input data into multiple bundles for Pandas UDF and the bundle size is non-determinstic. Both of the above two limitations are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of RC1[2] for 1.12.0 or build it yourself according to [3].

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions

Regards,
Dian

在 2020年11月11日,下午9:03,Niklas Wilcke <[hidden email]> 写道:

Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take.
2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#############################################################

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
    result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):

   # Train the model and create the forcast

   yhat_ts = forcast['yhat'].tail(input_size)
   return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
   .where("riid === 'r1i1'") \
   .select("ds, riid, y, forcast(ds, y) as yhat_90d") \
   .insert_into('mySink')

t_env.execute("pandas_udf_demo")

#############################################################




smime.p7s (4K) Download Attachment