Install/Run Streaming Anomaly Detection R package in Flink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Install/Run Streaming Anomaly Detection R package in Flink

Robert Cullen
My customer wants us to install this package in our Flink Cluster:


One of our engineers developed a python version:


Is there a way to install this in our cluster?

--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Install/Run Streaming Anomaly Detection R package in Flink

Roman Khachatryan
Hi,

I'm pulling in Wei Zhong and Xingbo Huang who know PyFlink better.

Regards,
Roman


On Mon, Feb 22, 2021 at 3:01 PM Robert Cullen <[hidden email]> wrote:
My customer wants us to install this package in our Flink Cluster:


One of our engineers developed a python version:


Is there a way to install this in our cluster?

--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Install/Run Streaming Anomaly Detection R package in Flink

Wei Zhong
Hi Robert,

If you do not want to install the library on every machine of the cluster, the Python dependency management API can be used to upload and use the required dependencies to cluster. 

For this case, I recommend building a portable python environment that contains all the required dependencies. You can call `add_python_archives` to upload the environment to your and call `set_python_executable` to set the path of the python interpreter in your cluster.

For more detailed information, you can refer to the following link.

Documentation of the Python dependency management API and configuration:

How to build a portable python environment:

Best,
Wei

在 2021年2月24日,01:38,Roman Khachatryan <[hidden email]> 写道:

Hi,

I'm pulling in Wei Zhong and Xingbo Huang who know PyFlink better.

Regards,
Roman


On Mon, Feb 22, 2021 at 3:01 PM Robert Cullen <[hidden email]> wrote:
My customer wants us to install this package in our Flink Cluster:


One of our engineers developed a python version:


Is there a way to install this in our cluster?

--
Robert Cullen
240-475-4490

Reply | Threaded
Open this post in threaded view
|

Re: Install/Run Streaming Anomaly Detection R package in Flink

Robert Cullen

Wei,

Thank you for pointing to those examples. Here is a code sample of how it's configured for me:

        env = StreamExecutionEnvironment.get_execution_environment()
        env.set_parallelism(1)
        env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
        env.add_python_archive("/Users/admin/pyflink/venv.zip")
        env.set_python_executable("venv.zip/venv/bin/python")
...

But when I run the virtual environment on my cluster I’m getting this error:

2021-03-29 15:42:35
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Failed to execute the command: venv.zip/venv/bin/python -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin'))
output: venv.zip/venv/bin/python: 1: venv.zip/venv/bin/python: Syntax error: "(" unexpected

at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:198)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
    at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
    at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
    at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
    at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
    at org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator.open(OneInputPythonFunctionOperator.java:126)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)

```


On Tue, Feb 23, 2021 at 10:10 PM Wei Zhong <[hidden email]> wrote:
Hi Robert,

If you do not want to install the library on every machine of the cluster, the Python dependency management API can be used to upload and use the required dependencies to cluster. 

For this case, I recommend building a portable python environment that contains all the required dependencies. You can call `add_python_archives` to upload the environment to your and call `set_python_executable` to set the path of the python interpreter in your cluster.

For more detailed information, you can refer to the following link.

Documentation of the Python dependency management API and configuration:

How to build a portable python environment:

Best,
Wei

在 2021年2月24日,01:38,Roman Khachatryan <[hidden email]> 写道:

Hi,

I'm pulling in Wei Zhong and Xingbo Huang who know PyFlink better.

Regards,
Roman


On Mon, Feb 22, 2021 at 3:01 PM Robert Cullen <[hidden email]> wrote:
My customer wants us to install this package in our Flink Cluster:


One of our engineers developed a python version:


Is there a way to install this in our cluster?

--
Robert Cullen
240-475-4490



--
Robert Cullen
240-475-4490