Python StreamExecutionEnvironment from_collection Kafka example

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

Python StreamExecutionEnvironment from_collection Kafka example

Robert Cullen

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Shuiqiang Chen
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Robert Cullen
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Shuiqiang Chen

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午4:01写道:
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Robert Cullen

Shuiqiang,

I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory

When submitting this job to my flink cluster I’m getting this stack trace at runtime:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
    ... 9 more

On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <[hidden email]> wrote:

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午4:01写道:
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

rmetzger0
Hey,
are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?

On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <[hidden email]> wrote:

Shuiqiang,

I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory

When submitting this job to my flink cluster I’m getting this stack trace at runtime:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
    ... 9 more

On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <[hidden email]> wrote:

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午4:01写道:
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Robert Cullen

Okay, I added the jars and fixed that exception. However I have a new exception that is harder to decipher:

2021-03-15 14:46:20
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
    at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
    at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
    at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
    at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
    at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 20 more

On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <[hidden email]> wrote:
Hey,
are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?

On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <[hidden email]> wrote:

Shuiqiang,

I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory

When submitting this job to my flink cluster I’m getting this stack trace at runtime:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
    ... 9 more

On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <[hidden email]> wrote:

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午4:01写道:
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Xingbo Huang
Hi,

From the error message, I think the problem is no python interpreter on your TaskManager machine. You need to install a python 3.5+ interpreter on the TM machine, and this python environment needs to install pyflink (pip install apache-flink). For details, you can refer to the document[1].

Robert Cullen <[hidden email]> 于2021年3月16日周二 上午2:58写道:

Okay, I added the jars and fixed that exception. However I have a new exception that is harder to decipher:

2021-03-15 14:46:20
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
    at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
    at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
    at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
    at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
    at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 20 more

On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <[hidden email]> wrote:
Hey,
are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?

On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <[hidden email]> wrote:

Shuiqiang,

I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory

When submitting this job to my flink cluster I’m getting this stack trace at runtime:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
    ... 9 more

On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <[hidden email]> wrote:

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午4:01写道:
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Robert Cullen
Thanks All,

I've added python and pyflink to the TM image which fixed the problem.  Now however submitting a python script to the cluster successfully is sporadic; sometimes it completes but most of the time it just hangs.  Not sure what is causing this.

On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <[hidden email]> wrote:
Hi,

From the error message, I think the problem is no python interpreter on your TaskManager machine. You need to install a python 3.5+ interpreter on the TM machine, and this python environment needs to install pyflink (pip install apache-flink). For details, you can refer to the document[1].

Robert Cullen <[hidden email]> 于2021年3月16日周二 上午2:58写道:

Okay, I added the jars and fixed that exception. However I have a new exception that is harder to decipher:

2021-03-15 14:46:20
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
    at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
    at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
    at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
    at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
    at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 20 more

On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <[hidden email]> wrote:
Hey,
are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?

On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <[hidden email]> wrote:

Shuiqiang,

I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory

When submitting this job to my flink cluster I’m getting this stack trace at runtime:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
    ... 9 more

On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <[hidden email]> wrote:

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午4:01写道:
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Dian Fu
Hi Robert,

1) Do you mean that when submitting the same job multiple times and it succeed sometimes and hangs sometimes or it only hangs for some specific job?
2) Which deployment mode do you use? 
3) Is it possible to dump the stack trace? It would help us understanding what’s happening.

Thanks,
Dian

2021年3月16日 下午11:51,Robert Cullen <[hidden email]> 写道:

Thanks All,

I've added python and pyflink to the TM image which fixed the problem.  Now however submitting a python script to the cluster successfully is sporadic; sometimes it completes but most of the time it just hangs.  Not sure what is causing this.

On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <[hidden email]> wrote:
Hi,

From the error message, I think the problem is no python interpreter on your TaskManager machine. You need to install a python 3.5+ interpreter on the TM machine, and this python environment needs to install pyflink (pip install apache-flink). For details, you can refer to the document[1].

Robert Cullen <[hidden email]> 于2021年3月16日周二 上午2:58写道:

Okay, I added the jars and fixed that exception. However I have a new exception that is harder to decipher:

2021-03-15 14:46:20
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
    at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
    at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
    at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
    at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
    at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 20 more

On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <[hidden email]> wrote:
Hey,
are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?

On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <[hidden email]> wrote:

Shuiqiang,

I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory

When submitting this job to my flink cluster I’m getting this stack trace at runtime:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
    ... 9 more

On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <[hidden email]> wrote:

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午4:01写道:
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490

Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Robert Cullen
Dian,

Thanks for your reply.  Yes, I would submit the same job in kubernetes session mode.  Sometimes the job would succeed but successive tries would fail. No stack trace, the job would never return a job id:

In this case I redeployed the cluster and the job completed ... and multiple tries were successful.


On Thu, Mar 18, 2021 at 4:36 AM Dian Fu <[hidden email]> wrote:
Hi Robert,

1) Do you mean that when submitting the same job multiple times and it succeed sometimes and hangs sometimes or it only hangs for some specific job?
2) Which deployment mode do you use? 
3) Is it possible to dump the stack trace? It would help us understanding what’s happening.

Thanks,
Dian

2021年3月16日 下午11:51,Robert Cullen <[hidden email]> 写道:

Thanks All,

I've added python and pyflink to the TM image which fixed the problem.  Now however submitting a python script to the cluster successfully is sporadic; sometimes it completes but most of the time it just hangs.  Not sure what is causing this.

On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <[hidden email]> wrote:
Hi,

From the error message, I think the problem is no python interpreter on your TaskManager machine. You need to install a python 3.5+ interpreter on the TM machine, and this python environment needs to install pyflink (pip install apache-flink). For details, you can refer to the document[1].

Robert Cullen <[hidden email]> 于2021年3月16日周二 上午2:58写道:

Okay, I added the jars and fixed that exception. However I have a new exception that is harder to decipher:

2021-03-15 14:46:20
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
    at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
    at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
    at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
    at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
    at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 20 more

On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <[hidden email]> wrote:
Hey,
are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?

On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <[hidden email]> wrote:

Shuiqiang,

I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory

When submitting this job to my flink cluster I’m getting this stack trace at runtime:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
    ... 9 more

On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <[hidden email]> wrote:

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午4:01写道:
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490



--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Dian Fu
Does the job runs in detached mode or attached mode? Could you share some code snippets and the job submission command if possible?

Regards,
Dian

2021年3月18日 下午8:17,Robert Cullen <[hidden email]> 写道:

Dian,

Thanks for your reply.  Yes, I would submit the same job in kubernetes session mode.  Sometimes the job would succeed but successive tries would fail. No stack trace, the job would never return a job id:

In this case I redeployed the cluster and the job completed ... and multiple tries were successful.


On Thu, Mar 18, 2021 at 4:36 AM Dian Fu <[hidden email]> wrote:
Hi Robert,

1) Do you mean that when submitting the same job multiple times and it succeed sometimes and hangs sometimes or it only hangs for some specific job?
2) Which deployment mode do you use? 
3) Is it possible to dump the stack trace? It would help us understanding what’s happening.

Thanks,
Dian

2021年3月16日 下午11:51,Robert Cullen <[hidden email]> 写道:

Thanks All,

I've added python and pyflink to the TM image which fixed the problem.  Now however submitting a python script to the cluster successfully is sporadic; sometimes it completes but most of the time it just hangs.  Not sure what is causing this.

On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <[hidden email]> wrote:
Hi,

From the error message, I think the problem is no python interpreter on your TaskManager machine. You need to install a python 3.5+ interpreter on the TM machine, and this python environment needs to install pyflink (pip install apache-flink). For details, you can refer to the document[1].

Robert Cullen <[hidden email]> 于2021年3月16日周二 上午2:58写道:

Okay, I added the jars and fixed that exception. However I have a new exception that is harder to decipher:

2021-03-15 14:46:20
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
    at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
    at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
    at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
    at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
    at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 20 more

On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <[hidden email]> wrote:
Hey,
are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?

On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <[hidden email]> wrote:

Shuiqiang,

I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory

When submitting this job to my flink cluster I’m getting this stack trace at runtime:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
    ... 9 more

On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <[hidden email]> wrote:

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午4:01写道:
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490



--
Robert Cullen
240-475-4490

Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Robert Cullen

Dian,
The job runs in attached mode. See the rest below.

Can you supply some examples of Tumbling Time Windows and the correct json formatting for writing to a kafka topic. This snippet does not write to my topic:

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):         
result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])

Here is the job submission command:

./bin/flink run \                
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
--pyModule anomaly_detection \
--pyFiles /opt/flink-1.12.0/examples/anomaly_detection.py

Here is the code I’m running:

from typing import Any 

from pyflink.common import Duration
from pyflink.common.serialization import SimpleStringSchema, JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.functions import KeyedProcessFunction, KeySelector, MapFunction

import logging
import json
import sys
import tad

def json_to_tuple(js, fields):
    return tuple([str(js.get(f, '')) for f in fields])

def anomaly_detection():

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

    # define the schema of the message from kafka, here the data is in json format.
    #    type_info = Types.ROW([Types.STRING(), Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()]), Types.INT(), Types.INT()])
    #    type_info = Types.ROW_NAMED(['msg_id', 'new_count', 'new_count_total', 'old_count', 'old_count_total', 'score'], [Types.STRING(), Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()])
    type_info = Types.ROW_NAMED(["msg_id", "hostname", "count"], [Types.STRING(), Types.STRING(), Types.INT()])
    json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

    logging.info("Row info: %s", json_row_schema)

    # define the kafka connection properties.
    kafka_props = {'bootstrap.servers': 'kafka-cp-kafka-headless:9092'}

    # create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
    kafka_consumer = FlinkKafkaConsumer("prometheus-output", json_row_schema, kafka_props)
    kafka_producer = FlinkKafkaProducer("prometheus-sink", SimpleStringSchema(), kafka_props)

    # set the kafka source to consume data from earliest offset.
    kafka_consumer.set_start_from_earliest()

    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()

    # create a DataStream from kafka consumer source
    ds = env.add_source(kafka_consumer)
    ds.map(lambda x: json.dumps({"msgId": x[0], "hostname": x[1], "count": x[2]}), output_type=Types.STRING()) \
        .add_sink(kafka_producer)
    #ds.key_by(lambda x: x[0], key_type_info=Types.STRING()) \
    #    .process(MyProcessFunction(), output_type=Types.TUPLE([Types.STRING(), Types.INT(), Types.LONG()])) \
    #    .add_sink(kafka_producer)
    env.execute("twitter_anomaly_detection")

class MyProcessFunction(KeyedProcessFunction):

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
        yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])
        current_watermark = ctx.timer_service().current_watermark()
        ctx.timer_service().register_event_time_timer(current_watermark)    
        anomaly_detect_ts(ctx.timestamp, max_anoms=0.02, direction="both", plot=True)

    #def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
    #    yield "On timer timestamp: " + str(timestamp)

class KafkaRowTimestampAssigner(TimestampAssigner):

    def extract_timestamp(self, value: Any, record_timestamp: int) -> int:
        return int(value[0])

class MyMapFunction(MapFunction):

    def map(self, value):
        return value[1]

class MyKeySelector(KeySelector):

    def get_key(self, value):
        return value[0]

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    anomaly_detection()

On Thu, Mar 18, 2021 at 10:10 PM Dian Fu <[hidden email]> wrote:
Does the job runs in detached mode or attached mode? Could you share some code snippets and the job submission command if possible?

Regards,
Dian

2021年3月18日 下午8:17,Robert Cullen <[hidden email]> 写道:

Dian,

Thanks for your reply.  Yes, I would submit the same job in kubernetes session mode.  Sometimes the job would succeed but successive tries would fail. No stack trace, the job would never return a job id:

In this case I redeployed the cluster and the job completed ... and multiple tries were successful.


On Thu, Mar 18, 2021 at 4:36 AM Dian Fu <[hidden email]> wrote:
Hi Robert,

1) Do you mean that when submitting the same job multiple times and it succeed sometimes and hangs sometimes or it only hangs for some specific job?
2) Which deployment mode do you use? 
3) Is it possible to dump the stack trace? It would help us understanding what’s happening.

Thanks,
Dian

2021年3月16日 下午11:51,Robert Cullen <[hidden email]> 写道:

Thanks All,

I've added python and pyflink to the TM image which fixed the problem.  Now however submitting a python script to the cluster successfully is sporadic; sometimes it completes but most of the time it just hangs.  Not sure what is causing this.

On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <[hidden email]> wrote:
Hi,

From the error message, I think the problem is no python interpreter on your TaskManager machine. You need to install a python 3.5+ interpreter on the TM machine, and this python environment needs to install pyflink (pip install apache-flink). For details, you can refer to the document[1].

Robert Cullen <[hidden email]> 于2021年3月16日周二 上午2:58写道:

Okay, I added the jars and fixed that exception. However I have a new exception that is harder to decipher:

2021-03-15 14:46:20
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
    at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
    at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
    at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
    at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
    at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 20 more

On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <[hidden email]> wrote:
Hey,
are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?

On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <[hidden email]> wrote:

Shuiqiang,

I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory

When submitting this job to my flink cluster I’m getting this stack trace at runtime:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
    ... 9 more

On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <[hidden email]> wrote:

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午4:01写道:
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490



--
Robert Cullen
240-475-4490



--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Dian Fu
Hi Robert,

Usually we should submit the job to a cluster in detached mode. Otherwise, it will wait until the job finishes or fails. 

Could you add the `--detached` flag during submission and try again? Or is there any specific reason to run it in attached mode?

Regards,
Dian

On Fri, Mar 19, 2021 at 10:10 PM Robert Cullen <[hidden email]> wrote:

Dian,
The job runs in attached mode. See the rest below.

Can you supply some examples of Tumbling Time Windows and the correct json formatting for writing to a kafka topic. This snippet does not write to my topic:

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):         
result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])

Here is the job submission command:

./bin/flink run \                
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
--pyModule anomaly_detection \
--pyFiles /opt/flink-1.12.0/examples/anomaly_detection.py

Here is the code I’m running:

from typing import Any 

from pyflink.common import Duration
from pyflink.common.serialization import SimpleStringSchema, JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.functions import KeyedProcessFunction, KeySelector, MapFunction

import logging
import json
import sys
import tad

def json_to_tuple(js, fields):
    return tuple([str(js.get(f, '')) for f in fields])

def anomaly_detection():

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

    # define the schema of the message from kafka, here the data is in json format.
    #    type_info = Types.ROW([Types.STRING(), Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()]), Types.INT(), Types.INT()])
    #    type_info = Types.ROW_NAMED(['msg_id', 'new_count', 'new_count_total', 'old_count', 'old_count_total', 'score'], [Types.STRING(), Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()])
    type_info = Types.ROW_NAMED(["msg_id", "hostname", "count"], [Types.STRING(), Types.STRING(), Types.INT()])
    json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

    logging.info("Row info: %s", json_row_schema)

    # define the kafka connection properties.
    kafka_props = {'bootstrap.servers': 'kafka-cp-kafka-headless:9092'}

    # create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
    kafka_consumer = FlinkKafkaConsumer("prometheus-output", json_row_schema, kafka_props)
    kafka_producer = FlinkKafkaProducer("prometheus-sink", SimpleStringSchema(), kafka_props)

    # set the kafka source to consume data from earliest offset.
    kafka_consumer.set_start_from_earliest()

    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()

    # create a DataStream from kafka consumer source
    ds = env.add_source(kafka_consumer)
    ds.map(lambda x: json.dumps({"msgId": x[0], "hostname": x[1], "count": x[2]}), output_type=Types.STRING()) \
        .add_sink(kafka_producer)
    #ds.key_by(lambda x: x[0], key_type_info=Types.STRING()) \
    #    .process(MyProcessFunction(), output_type=Types.TUPLE([Types.STRING(), Types.INT(), Types.LONG()])) \
    #    .add_sink(kafka_producer)
    env.execute("twitter_anomaly_detection")

class MyProcessFunction(KeyedProcessFunction):

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
        yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])
        current_watermark = ctx.timer_service().current_watermark()
        ctx.timer_service().register_event_time_timer(current_watermark)    
        anomaly_detect_ts(ctx.timestamp, max_anoms=0.02, direction="both", plot=True)

    #def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
    #    yield "On timer timestamp: " + str(timestamp)

class KafkaRowTimestampAssigner(TimestampAssigner):

    def extract_timestamp(self, value: Any, record_timestamp: int) -> int:
        return int(value[0])

class MyMapFunction(MapFunction):

    def map(self, value):
        return value[1]

class MyKeySelector(KeySelector):

    def get_key(self, value):
        return value[0]

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    anomaly_detection()

On Thu, Mar 18, 2021 at 10:10 PM Dian Fu <[hidden email]> wrote:
Does the job runs in detached mode or attached mode? Could you share some code snippets and the job submission command if possible?

Regards,
Dian

2021年3月18日 下午8:17,Robert Cullen <[hidden email]> 写道:

Dian,

Thanks for your reply.  Yes, I would submit the same job in kubernetes session mode.  Sometimes the job would succeed but successive tries would fail. No stack trace, the job would never return a job id:

In this case I redeployed the cluster and the job completed ... and multiple tries were successful.


On Thu, Mar 18, 2021 at 4:36 AM Dian Fu <[hidden email]> wrote:
Hi Robert,

1) Do you mean that when submitting the same job multiple times and it succeed sometimes and hangs sometimes or it only hangs for some specific job?
2) Which deployment mode do you use? 
3) Is it possible to dump the stack trace? It would help us understanding what’s happening.

Thanks,
Dian

2021年3月16日 下午11:51,Robert Cullen <[hidden email]> 写道:

Thanks All,

I've added python and pyflink to the TM image which fixed the problem.  Now however submitting a python script to the cluster successfully is sporadic; sometimes it completes but most of the time it just hangs.  Not sure what is causing this.

On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <[hidden email]> wrote:
Hi,

From the error message, I think the problem is no python interpreter on your TaskManager machine. You need to install a python 3.5+ interpreter on the TM machine, and this python environment needs to install pyflink (pip install apache-flink). For details, you can refer to the document[1].

Robert Cullen <[hidden email]> 于2021年3月16日周二 上午2:58写道:

Okay, I added the jars and fixed that exception. However I have a new exception that is harder to decipher:

2021-03-15 14:46:20
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
    at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
    at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
    at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
    at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
    at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 20 more

On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <[hidden email]> wrote:
Hey,
are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?

On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <[hidden email]> wrote:

Shuiqiang,

I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory

When submitting this job to my flink cluster I’m getting this stack trace at runtime:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
    ... 9 more

On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <[hidden email]> wrote:

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午4:01写道:
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490



--
Robert Cullen
240-475-4490



--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Robert Cullen
Dian,

I discovered the issue.  When I redeploy the kubernetes cluster completed jobs still remain in the queue.  The Flink REST service will throw an error looking for the missing jobs and thus subsequent submission hangs.

On Mon, Mar 22, 2021 at 3:46 AM Dian Fu <[hidden email]> wrote:
Hi Robert,

Usually we should submit the job to a cluster in detached mode. Otherwise, it will wait until the job finishes or fails. 

Could you add the `--detached` flag during submission and try again? Or is there any specific reason to run it in attached mode?

Regards,
Dian

On Fri, Mar 19, 2021 at 10:10 PM Robert Cullen <[hidden email]> wrote:

Dian,
The job runs in attached mode. See the rest below.

Can you supply some examples of Tumbling Time Windows and the correct json formatting for writing to a kafka topic. This snippet does not write to my topic:

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):         
result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])

Here is the job submission command:

./bin/flink run \                
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
--pyModule anomaly_detection \
--pyFiles /opt/flink-1.12.0/examples/anomaly_detection.py

Here is the code I’m running:

from typing import Any 

from pyflink.common import Duration
from pyflink.common.serialization import SimpleStringSchema, JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.functions import KeyedProcessFunction, KeySelector, MapFunction

import logging
import json
import sys
import tad

def json_to_tuple(js, fields):
    return tuple([str(js.get(f, '')) for f in fields])

def anomaly_detection():

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

    # define the schema of the message from kafka, here the data is in json format.
    #    type_info = Types.ROW([Types.STRING(), Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()]), Types.INT(), Types.INT()])
    #    type_info = Types.ROW_NAMED(['msg_id', 'new_count', 'new_count_total', 'old_count', 'old_count_total', 'score'], [Types.STRING(), Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()])
    type_info = Types.ROW_NAMED(["msg_id", "hostname", "count"], [Types.STRING(), Types.STRING(), Types.INT()])
    json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

    logging.info("Row info: %s", json_row_schema)

    # define the kafka connection properties.
    kafka_props = {'bootstrap.servers': 'kafka-cp-kafka-headless:9092'}

    # create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
    kafka_consumer = FlinkKafkaConsumer("prometheus-output", json_row_schema, kafka_props)
    kafka_producer = FlinkKafkaProducer("prometheus-sink", SimpleStringSchema(), kafka_props)

    # set the kafka source to consume data from earliest offset.
    kafka_consumer.set_start_from_earliest()

    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()

    # create a DataStream from kafka consumer source
    ds = env.add_source(kafka_consumer)
    ds.map(lambda x: json.dumps({"msgId": x[0], "hostname": x[1], "count": x[2]}), output_type=Types.STRING()) \
        .add_sink(kafka_producer)
    #ds.key_by(lambda x: x[0], key_type_info=Types.STRING()) \
    #    .process(MyProcessFunction(), output_type=Types.TUPLE([Types.STRING(), Types.INT(), Types.LONG()])) \
    #    .add_sink(kafka_producer)
    env.execute("twitter_anomaly_detection")

class MyProcessFunction(KeyedProcessFunction):

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
        yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])
        current_watermark = ctx.timer_service().current_watermark()
        ctx.timer_service().register_event_time_timer(current_watermark)    
        anomaly_detect_ts(ctx.timestamp, max_anoms=0.02, direction="both", plot=True)

    #def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
    #    yield "On timer timestamp: " + str(timestamp)

class KafkaRowTimestampAssigner(TimestampAssigner):

    def extract_timestamp(self, value: Any, record_timestamp: int) -> int:
        return int(value[0])

class MyMapFunction(MapFunction):

    def map(self, value):
        return value[1]

class MyKeySelector(KeySelector):

    def get_key(self, value):
        return value[0]

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    anomaly_detection()

On Thu, Mar 18, 2021 at 10:10 PM Dian Fu <[hidden email]> wrote:
Does the job runs in detached mode or attached mode? Could you share some code snippets and the job submission command if possible?

Regards,
Dian

2021年3月18日 下午8:17,Robert Cullen <[hidden email]> 写道:

Dian,

Thanks for your reply.  Yes, I would submit the same job in kubernetes session mode.  Sometimes the job would succeed but successive tries would fail. No stack trace, the job would never return a job id:

In this case I redeployed the cluster and the job completed ... and multiple tries were successful.


On Thu, Mar 18, 2021 at 4:36 AM Dian Fu <[hidden email]> wrote:
Hi Robert,

1) Do you mean that when submitting the same job multiple times and it succeed sometimes and hangs sometimes or it only hangs for some specific job?
2) Which deployment mode do you use? 
3) Is it possible to dump the stack trace? It would help us understanding what’s happening.

Thanks,
Dian

2021年3月16日 下午11:51,Robert Cullen <[hidden email]> 写道:

Thanks All,

I've added python and pyflink to the TM image which fixed the problem.  Now however submitting a python script to the cluster successfully is sporadic; sometimes it completes but most of the time it just hangs.  Not sure what is causing this.

On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <[hidden email]> wrote:
Hi,

From the error message, I think the problem is no python interpreter on your TaskManager machine. You need to install a python 3.5+ interpreter on the TM machine, and this python environment needs to install pyflink (pip install apache-flink). For details, you can refer to the document[1].

Robert Cullen <[hidden email]> 于2021年3月16日周二 上午2:58写道:

Okay, I added the jars and fixed that exception. However I have a new exception that is harder to decipher:

2021-03-15 14:46:20
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
    at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
    at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
    at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
    at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
    at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 20 more

On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <[hidden email]> wrote:
Hey,
are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?

On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <[hidden email]> wrote:

Shuiqiang,

I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory

When submitting this job to my flink cluster I’m getting this stack trace at runtime:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
    ... 9 more

On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <[hidden email]> wrote:

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午4:01写道:
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490



--
Robert Cullen
240-475-4490



--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Python StreamExecutionEnvironment from_collection Kafka example

Dian Fu
Great to hear!

Regards,
Dian

On Tue, Mar 23, 2021 at 12:46 AM Robert Cullen <[hidden email]> wrote:
Dian,

I discovered the issue.  When I redeploy the kubernetes cluster completed jobs still remain in the queue.  The Flink REST service will throw an error looking for the missing jobs and thus subsequent submission hangs.

On Mon, Mar 22, 2021 at 3:46 AM Dian Fu <[hidden email]> wrote:
Hi Robert,

Usually we should submit the job to a cluster in detached mode. Otherwise, it will wait until the job finishes or fails. 

Could you add the `--detached` flag during submission and try again? Or is there any specific reason to run it in attached mode?

Regards,
Dian

On Fri, Mar 19, 2021 at 10:10 PM Robert Cullen <[hidden email]> wrote:

Dian,
The job runs in attached mode. See the rest below.

Can you supply some examples of Tumbling Time Windows and the correct json formatting for writing to a kafka topic. This snippet does not write to my topic:

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):         
result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])

Here is the job submission command:

./bin/flink run \                
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
--pyModule anomaly_detection \
--pyFiles /opt/flink-1.12.0/examples/anomaly_detection.py

Here is the code I’m running:

from typing import Any 

from pyflink.common import Duration
from pyflink.common.serialization import SimpleStringSchema, JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.functions import KeyedProcessFunction, KeySelector, MapFunction

import logging
import json
import sys
import tad

def json_to_tuple(js, fields):
    return tuple([str(js.get(f, '')) for f in fields])

def anomaly_detection():

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

    # define the schema of the message from kafka, here the data is in json format.
    #    type_info = Types.ROW([Types.STRING(), Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()]), Types.INT(), Types.INT()])
    #    type_info = Types.ROW_NAMED(['msg_id', 'new_count', 'new_count_total', 'old_count', 'old_count_total', 'score'], [Types.STRING(), Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()])
    type_info = Types.ROW_NAMED(["msg_id", "hostname", "count"], [Types.STRING(), Types.STRING(), Types.INT()])
    json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

    logging.info("Row info: %s", json_row_schema)

    # define the kafka connection properties.
    kafka_props = {'bootstrap.servers': 'kafka-cp-kafka-headless:9092'}

    # create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
    kafka_consumer = FlinkKafkaConsumer("prometheus-output", json_row_schema, kafka_props)
    kafka_producer = FlinkKafkaProducer("prometheus-sink", SimpleStringSchema(), kafka_props)

    # set the kafka source to consume data from earliest offset.
    kafka_consumer.set_start_from_earliest()

    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()

    # create a DataStream from kafka consumer source
    ds = env.add_source(kafka_consumer)
    ds.map(lambda x: json.dumps({"msgId": x[0], "hostname": x[1], "count": x[2]}), output_type=Types.STRING()) \
        .add_sink(kafka_producer)
    #ds.key_by(lambda x: x[0], key_type_info=Types.STRING()) \
    #    .process(MyProcessFunction(), output_type=Types.TUPLE([Types.STRING(), Types.INT(), Types.LONG()])) \
    #    .add_sink(kafka_producer)
    env.execute("twitter_anomaly_detection")

class MyProcessFunction(KeyedProcessFunction):

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
        yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])
        current_watermark = ctx.timer_service().current_watermark()
        ctx.timer_service().register_event_time_timer(current_watermark)    
        anomaly_detect_ts(ctx.timestamp, max_anoms=0.02, direction="both", plot=True)

    #def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
    #    yield "On timer timestamp: " + str(timestamp)

class KafkaRowTimestampAssigner(TimestampAssigner):

    def extract_timestamp(self, value: Any, record_timestamp: int) -> int:
        return int(value[0])

class MyMapFunction(MapFunction):

    def map(self, value):
        return value[1]

class MyKeySelector(KeySelector):

    def get_key(self, value):
        return value[0]

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    anomaly_detection()

On Thu, Mar 18, 2021 at 10:10 PM Dian Fu <[hidden email]> wrote:
Does the job runs in detached mode or attached mode? Could you share some code snippets and the job submission command if possible?

Regards,
Dian

2021年3月18日 下午8:17,Robert Cullen <[hidden email]> 写道:

Dian,

Thanks for your reply.  Yes, I would submit the same job in kubernetes session mode.  Sometimes the job would succeed but successive tries would fail. No stack trace, the job would never return a job id:

In this case I redeployed the cluster and the job completed ... and multiple tries were successful.


On Thu, Mar 18, 2021 at 4:36 AM Dian Fu <[hidden email]> wrote:
Hi Robert,

1) Do you mean that when submitting the same job multiple times and it succeed sometimes and hangs sometimes or it only hangs for some specific job?
2) Which deployment mode do you use? 
3) Is it possible to dump the stack trace? It would help us understanding what’s happening.

Thanks,
Dian

2021年3月16日 下午11:51,Robert Cullen <[hidden email]> 写道:

Thanks All,

I've added python and pyflink to the TM image which fixed the problem.  Now however submitting a python script to the cluster successfully is sporadic; sometimes it completes but most of the time it just hangs.  Not sure what is causing this.

On Mon, Mar 15, 2021 at 9:47 PM Xingbo Huang <[hidden email]> wrote:
Hi,

From the error message, I think the problem is no python interpreter on your TaskManager machine. You need to install a python 3.5+ interpreter on the TM machine, and this python environment needs to install pyflink (pip install apache-flink). For details, you can refer to the document[1].

Robert Cullen <[hidden email]> 于2021年3月16日周二 上午2:58写道:

Okay, I added the jars and fixed that exception. However I have a new exception that is harder to decipher:

2021-03-15 14:46:20
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
    at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
    at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
    at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
    at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
    at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
    at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 20 more

On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger <[hidden email]> wrote:
Hey,
are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?

On Mon, Mar 15, 2021 at 3:42 PM Robert Cullen <[hidden email]> wrote:

Shuiqiang,

I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory

When submitting this job to my flink cluster I’m getting this stack trace at runtime:

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
    ... 9 more

On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen <[hidden email]> wrote:

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午4:01写道:
Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <[hidden email]> wrote:
Hi Robert,

Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.

The following code shows how to apply KafkaConsumers and KafkaProducer:
```
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

# define the schema of the message from kafka, here the data is in json format.
type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'],
[Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()

# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': 'pyflink-e2e-source'}

# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props)

# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()

# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer) 

result_stream = ...

# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer)
```

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月13日周六 上午12:56写道:

I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490



--
Robert Cullen
240-475-4490



--
Robert Cullen
240-475-4490


--
Robert Cullen
240-475-4490