Hi Flink Community,
I'm currently trying to implement a parallel machine learning job with Flink. The goal is to train models in parallel for independent time series in the same data stream. For that purpose I'm using a Python library, which lead me to PyFlink. Let me explain the use case a bit more. I want to implement a batch job, which partitions/groups the data by a device identifier. After that I need to process the data for each device all at once. There is no way to iteratively train the model unfortunately. The challenge I'm facing is to guarantee that all data belonging to a certain device is processed in one single step. I'm aware of the fact that this does not scale well, but for a reasonable amount of input data per device it should be fine from my perspective. I investigated a lot and I ended up using the Table API and Pandas UDF, which roughly fulfil my requirements, but there are the following limitations left, which I wanted to talk about. 1. Pandas UDF takes multiple Series as input parameters, which is fine for my purpose, but as far as I can see there is no way to guarantee that the chunk of data in the Series is "complete". Flink will slice the Series and maybe call the UDF multiple times for each device. As far as I can see there are some config options like "python.fn-execution.arrow.batch.size" and "python.fn-execution.bundle.time", which might help, but I'm not sure, whether this is the right path to take. 2. The length of the input Series needs to be of the same size as the output Series, which isn't nice for my use case. What I would like to do is to process n rows and emit m rows. There shouldn't be any dependency between the number of input rows and the number of output rows. 3. How do I partition the data stream. The Table API offers a groupby, but this doesn't serve my purpose, because I don't want to aggregate all the grouped lines. Instead as stated above I want to emit m result lines per group. Are there other options using the Table API or any other API to do this kind of grouping. I would need something like a "keyBy()" from the streaming API. Maybe this can be combined? Can I create a separate table for each key? I'm also open to ideas for a completely different approach not using the Table API or Pandas UDF. Any idea is welcome. You can find a condensed version of the source code attached. Kind Regards, Niklas ############################################################# from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.udf import udf env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True) @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()], result_type=DataTypes.FLOAT(), udf_type='pandas') def forcast(ds_float_series, y): # Train the model and create the forcast yhat_ts = forcast['yhat'].tail(input_size) return yhat_ts t_env.register_function("forcast", forcast) # Define sink and source here t_env.execute_sql(my_source_ddl) t_env.execute_sql(my_sink_ddl) # TODO: key_by instead of filter t_env.from_path('mySource') \ .where("riid === 'r1i1'") \ .select("ds, riid, y, forcast(ds, y) as yhat_90d") \ .insert_into('mySink') t_env.execute("pandas_udf_demo") ############################################################# smime.p7s (4K) Download Attachment |
Hi Niklas, You are correct that the input/output length of Pandas UDF must be of the same size and that Flink will split the input data into multiple bundles for Pandas UDF and the bundle size is non-determinstic. Both of the above two limitations are by design and so I guess Pandas UDF could not meet your requirements. However, you could take a look at if the Pandas UDAF[1] which was supported in 1.12 could meet your requirements: - As group_by only generate one record per group key just as you said, you could declare the output type of Pandas UDAF as an array type - You need then flatten the aggregation results, e.g. using UNNEST NOTE: Flink 1.12 is still not released. You could try the PyFlink package of RC1[2] for 1.12.0 or build it yourself according to [3]. Regards, Dian
|
Hi Dian,
thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue. Another question that came up to my mind is whether PyFlink supports any other API except Table and SQL, like the Streaming and Batch API. The documentation is only covering the Table API, but I'm not sure about that. Can you maybe tell me whether the Table and SQL API is the only one supported by PyFlink? Kind Regards, Niklas
smime.p7s (4K) Download Attachment |
Hi Niklas, Python DataStream API will also be supported in coming release of 1.12.0 [1]. However, the functionalities are still limited for the time being compared to the Java DataStream API, e.g. it will only support the stateless operations, such as map, flat_map, etc.
|
Hi Dian,
thanks again for your response. In the meantime I tried out your proposal using the UDAF feature of PyFlink 1.12.0-rc1 and it is roughly working, but I am facing some issues, which I would like to address. If this goes too far, please let me know and I will open a new thread for each of the questions. Let me share some more information about my current environment, which will maybe help to answer the questions. I'm currently using my dev machine with Docker and one jobmanager container and one taskmanager container. If needed I can share the whole docker environment, but this would involve some more effort on my side. Here are my five questions. 1. Where can I find connector libraries for 1.12.0-rc1 or some kind of instruction how to build them? I can't find them in the 1.12.0-rc1 release and when I build flink from source, I can't find the connector libraries in the build target. I need flink-sql-connector-elasticsearch7 and flink-sql-connector-kafka. 2. Which steps are needed to properly Setup PyFlink? I followed the instructions, but I always get some ClassNotFoundExceptions for some Beam related classes in the taskmanager. The job still works fine, but this doesn't look good to me. It happens in 1.11.2 and in 1.12.0-rc1. I tried to resolve this by adding certain jars, but I wasn't able to fix it. Maybe you have an idea. You can find the Dockerfile attached, which lines out the steps I'm currently using. The Exceptions signature looks like this. Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1 3. When increasing the size of the input data set I get the following Exception and the job is canceled. I tried to increase the resources assigned to flink, but it didn't help. Do you have an idea why this is happening? You can find a more detailed stack trace in apendix. 4. I can't manage to get the SQL UNNEST operation to work. It is quite hard for me to debug it and I can't really find any valuable examples or documentation on the internet. Currently instead of creating an ARRAY I'm just returning a VARCHAR containing a string representation of the array. The relevant code you can find in the apendix. 5. How can I obtain the output of the Python interpreter executing the UDF. If I put a print statement in the UDF I can't see the output in the log of the taskmanager. Is there a way to access it? I hope these aren't too many questions for this thread. If this is the case I can still split some of them out. Please let me know, if this is the case. Thank you very much. I really appreciate your help. Kind Regards, Niklas Dockerfile for question 2. #################################################################### # This image has been build based on the Dockerfile used for the flink image on docker hub. # The only change I applied is that I switched to flink 1.12.0-rc1. FROM flink:1.12.0-rc1-scala_2.12 # Install python # TODO: Minimize dependencies RUN apt-get update && apt-get install -y \ python3 \ python3-pip \ python3-dev \ zip \ && rm -rf /var/lib/apt/lists/* \ && ln -s /usr/bin/python3 /usr/bin/python \ && ln -s /usr/bin/pip3 /usr/bin/pip # Install pyflink RUN wget --no-verbose https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \ && pip install apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl \ && rm apache_flink-1.12.0-cp37-cp37m-manylinux1_x86_64.whl #################################################################### Stack Trace for question 3. #################################################################### Caused by: java.lang.RuntimeException: Failed to close remote bundle at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:368) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:322) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:283) at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(AbstractPythonFunctionOperator.java:267) at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.invokeCurrentBatch(BatchArrowPythonGroupAggregateFunctionOperator.java:64) at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.AbstractBatchArrowPythonAggregateFunctionOperator.endInput(AbstractBatchArrowPythonAggregateFunctionOperator.java:94) at org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch.BatchArrowPythonGroupAggregateFunctionOperator.endInput(BatchArrowPythonGroupAggregateFunctionOperator.java:33) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134) at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412) at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:587) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458) at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547) at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:366) ... 17 more Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more ################################################################ Code for question 4. ################################################################ # UDAF signature @udaf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()], result_type=DataTypes.VARCHAR(10000), func_type='pandas') def forcast(ds_float_series, y): # SQL DDL "create table mySource (ds FLOAT, riid VARCHAR(100), y FLOAT ) with ( ... )" "create table mySink (riid VARCHAR(100), yhatd VARCHAR(10000)) with ( ... )" # SQL INSERT "INSERT INTO mySink SELECT riid, forcast(ds, y) AS yhat FROM mySource GROUP BY riid" ################################################################
smime.p7s (4K) Download Attachment |
Hi Niklas, Good to know that this solution may work for you. Regarding to the questions you raised, please find my reply inline. Regards, Dian
You could download the connector jars of 1.12.0-rc1 from here: https://repository.apache.org/content/repositories/orgapacheflink-1402/org/apache/flink/
Usually there is nothing specially need to do to set up PyFlink. I have manually checked that this class should be there(inside flink-python_2.11-1.12.0.jar) and so guess if it's because you environment isn't clean enough? I guess you could check the following things: 1) Is it because you have installed 1.11.2 before and so the environment is not clean enough? Could you uninstall PyFlink 1.11.2 manually and reinstall PyFlink 1.12.0-rc1 again? You could also manually check that there should be only one flink-python*.jar under directory xxx/site-packages/pyflink/opt/ 2) Verify that the class is actually there by the following command: (flink-python_2.11-1.12.0.jar is under directory xxx/site-packages/pyflink/opt/) jar tf flink-python_2.11-1.12.0.jar | grep "org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena" 3) If this exception still happens, could you share the exception stack?
Could you check if there are any other exceptions in the log when this exception happens?
You can use the standard logging in Python UDF instead of print. The log output could then be found in the log of the task manager.
It's fine to reuse this thread. :)
|
Hi Dian,
this was very helpful again. To the old questions I will answer inline as well. Unfortunately also one new question popped up. How can I ingest data in a batch table from Kafka or even better Elasticsearch. Kafka is only offering a Streaming source and Elasticsearch isn't offering a source at all. The only workaround which comes to my mind is to use the Kafka streaming source and to apply a single very large window to create a bounded table. Do you think that would work? Are there other options available? Maybe converting a Stream to a bounded table is somehow possible? Thank you! Kind Regards, Niklas
Thanks that worked like a charm!
I found one cause of this problem and it was mixing a Scala 2.12 Flink installation with PyFlink, which has some 2.11 jars in its opt folder. I think the JVM just skipped the class definitions, because they weren't compatible. I actually wasn't aware of the fact that PyFlink comes with prebuilt jar dependencies. If PyFlink is only compatible with Scala 2.11 it would make sense to point that out in the documentation. I think I never read that and it might be missing. Unfortunately there is still one Exception showing up at the very end of the job in the taskmanager log. I did the verification you asked for and the class is present in both jar files. The one which comes with Flink in the opt folder and the one of PyFlink. You can find the log attached. I think the main question is which jar file has be loaded in in the three envronments (executor, jobmanager, taskmanager). Is it fine to not put the flink-python_2.11-1.12.0.jar into the lib folder in the jobmanager and taskmanager? Will it somehow be transferred by PyFlink to the jobmanager and taskmanager?
No I don't think that there are additional exceptions besides "org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException", but maybe take a look in the attached log files. This problem could be related to 2., maybe the root cause is a class loading issue as well. What do you think? You can find attached three log files. One for the executor, the jobmanager and the taskmanager. Maybe you can find something useful.
This was very helpful. I was able to implement it. There is only one detail missing. Is it possible to UNNEST an array of Rows or tuples? It would be really great if I would be able to return a list with multiple fields. Currently I'm just putting multiple value into a single VARCHAR, but that means the information needs to be extracted later on. Maybe you have an idea how to avoid that.
Thank you! That worked well. I should have checked that without asking.
End of the Taskmanager Log for 2. ################################################################### taskmanager_1 | 2020-11-15 17:46:53,438 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:5, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: e5137050c0f1ef5e660311ddf1f3429f, jobId: ba4e3974860af7dc00a28fdfbb44fe06). taskmanager_1 | 2020-11-15 17:46:53,440 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 541ad3e383fb9c024141f2bab5e8b7fd, jobId: ba4e3974860af7dc00a28fdfbb44fe06). taskmanager_1 | 2020-11-15 17:46:53,442 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: ef8adb7d879f4072123fe4bc12054c0c, jobId: ba4e3974860af7dc00a28fdfbb44fe06). taskmanager_1 | 2020-11-15 17:46:53,444 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:4, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: db5d62b8c9fe8172fc1883c148b150e8, jobId: ba4e3974860af7dc00a28fdfbb44fe06). taskmanager_1 | 2020-11-15 17:46:53,846 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: 637d053a0726548c2bc9261fc0e55414, jobId: ba4e3974860af7dc00a28fdfbb44fe06). taskmanager_1 | 2020-11-15 17:46:53,849 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:3, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=219.333mb (229987662 bytes), taskOffHeapMemory=166.667mb (174762666 bytes), managedMemory=342.933mb (359591667 bytes), networkMemory=85.733mb (89897916 bytes)}, allocationId: cfaa8633b9102e3a509cfc94dd97d38f, jobId: ba4e3974860af7dc00a28fdfbb44fe06). taskmanager_1 | 2020-11-15 17:46:53,851 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job ba4e3974860af7dc00a28fdfbb44fe06 from job leader monitoring. taskmanager_1 | 2020-11-15 17:46:53,851 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job ba4e3974860af7dc00a28fdfbb44fe06. taskmanager_1 | 2020-11-15 17:46:54,371 ERROR org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.rejectedExecution [] - Failed to submit a listener notification task. Event loop shut down? taskmanager_1 | java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/util/concurrent/GlobalEventExecutor$2 taskmanager_1 | at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.startThread(GlobalEventExecutor.java:227) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] taskmanager_1 | at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor.execute(GlobalEventExecutor.java:215) ~[blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] taskmanager_1 | at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:841) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] taskmanager_1 | at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:498) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] taskmanager_1 | at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] taskmanager_1 | at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] taskmanager_1 | at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:96) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] taskmanager_1 | at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1089) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] taskmanager_1 | at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] taskmanager_1 | at org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [blob_p-2cc5d5ac59c7842f512002d81251a3cbfed058cc-e14fe009bc07ddff407ea4c5d74bd4be:1.12.0] taskmanager_1 | at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275] taskmanager_1 | Caused by: java.lang.ClassNotFoundException: org.apache.beam.vendor.grpc.v1p26p0.io.netty.util.concurrent.GlobalEventExecutor$2 taskmanager_1 | at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_275] taskmanager_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_275] taskmanager_1 | at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63) ~[flink-dist_2.11-1.12.0.jar:1.12.0] taskmanager_1 | at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0] taskmanager_1 | at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49) ~[flink-dist_2.11-1.12.0.jar:1.12.0] taskmanager_1 | at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_275] taskmanager_1 | ... 11 more ################################################################### Logfiles for 3.
large-data-set-taskmanager.log (366K) Download Attachment large-data-set-jobmanager.log (151K) Download Attachment large-data-set-executor.log (14K) Download Attachment smime.p7s (4K) Download Attachment |
Hi Niklas,
I think you are right that Kafka still doesn't support batch and there is no ES source for now. Another option is you could load the data into a connector which supports batch. Not sure if anybody else has a better idea about this.
PyFlink comes with the built-in jars such as flink-python_2.11-1.12.0.jar, flink-dist_2.11-1.12.0.jar, etc and so you don't need to manually add them(also shouldn't do that). Could you remove the duplicate jars and try it again?
I found one similar issue at Beam side: https://issues.apache.org/jira/browse/BEAM-6258 which has been resolved long time ago. I'm still trying to reproduce this issue and will let you know if there is any progress. (Would be great if you could help to provide an example which could easily reproduce this issue)
Currently, Pandas UDAF still doesn't support complex type and so I'm afraid that you have to put multiple values into a single VARCHAR for now. Regards, Dian
|
Hi Niklas, Regarding `Exception in thread "grpc-nio-worker-ELG-3-2" java.lang.NoClassDefFoundError: org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1`, it does not affect the correctness of the result. The reason is that some resources are released asynchronously when Grpc Server is shut down[1] . After the UserClassLoader unloads the class, the asynchronous thread tries to release the resources and throw NotClassFoundException, but the content of the data result has been sent downstream, so the correctness of the result will not be affected. Regarding the details of the specific causes, I have explained in the flink community[2] and the beam community[3], and fixed them in the flink community. There will be no such problem in the next version of release 1.11.3 and 1.12.0. [1] https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/SharedResourceHolder.java#L150 [2] https://issues.apache.org/jira/browse/FLINK-20284 [3] https://issues.apache.org/jira/browse/BEAM-5397 Best, Xingbo Dian Fu <[hidden email]> 于2020年11月16日周一 下午9:10写道:
|
Hi Xingbo,
thanks for taking care and letting me know. I was about to share an example, how to reproduce this. Now I will wait for the next release candidate and give it a try. Regards, Niklas -- [hidden email] Mobile: +49 160 9793 2593 Office: +49 40 2380 6523 Simon-von-Utrecht-Straße 85a 20359 Hamburg UNIBERG GmbH Registergericht: Amtsgericht Kiel HRB SE-1507 Geschäftsführer: Andreas Möller, Martin Ulbricht Information Art. 13 DSGVO B2B: Für die Kommunikation mit Ihnen verarbeiten wir ggf. Ihre personenbezogenen Daten. Alle Informationen zum Umgang mit Ihren Daten finden Sie unter https://www.uniberg.com/impressum.html.
smime.p7s (4K) Download Attachment |
Hi Niklas, Thanks a lot for supporting PyFlink. In fact, your requirement for multiple input and multiple output is essentially Table Aggregation Functions[1]. Although PyFlink does not support it yet, we have listed it in the release 1.13 plan. In addition, row-based operations[2] that are very user-friendly to machine learning users are also included in the 1.13 plan. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#table-aggregation-functions [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations Best, Xingbo Niklas Wilcke <[hidden email]> 于2020年11月26日周四 下午5:11写道:
|
Hi Xingbo,
thanks for sharing. This is very interesting. Regards, Niklas
smime.p7s (4K) Download Attachment |
Free forum by Nabble | Edit this page |