|
Great to hear!
Regards, Dian On Tue, Mar 23, 2021 at 12:46 AM Robert Cullen < [hidden email]> wrote: Dian,
I discovered the issue. When I redeploy the kubernetes cluster completed jobs still remain in the queue. The Flink REST service will throw an error looking for the missing jobs and thus subsequent submission hangs.
Hi Robert,
Usually we should submit the job to a cluster in detached mode. Otherwise, it will wait until the job finishes or fails.
Could you add the `--detached` flag during submission and try again? Or is there any specific reason to run it in attached mode?
Regards, Dian
On Fri, Mar 19, 2021 at 10:10 PM Robert Cullen < [hidden email]> wrote: Dian, The job runs in attached mode. See the rest below.
Can you supply some examples of Tumbling Time Windows and the correct json formatting for writing to a kafka topic. This snippet does not write to my topic:
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp()))) yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])
Here is the job submission command:
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
--pyModule anomaly_detection \
--pyFiles /opt/flink-1.12.0/examples/anomaly_detection.py
Here is the code I’m running:
from typing import Any
from pyflink.common import Duration
from pyflink.common.serialization import SimpleStringSchema, JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.functions import KeyedProcessFunction, KeySelector, MapFunction
import logging
import json
import sys
import tad
def json_to_tuple(js, fields):
return tuple([str(js.get(f, '')) for f in fields])
def anomaly_detection():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
# define the schema of the message from kafka, here the data is in json format.
# type_info = Types.ROW([Types.STRING(), Types.STRING(), Types.ROW([Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()]), Types.INT(), Types.INT()])
# type_info = Types.ROW_NAMED(['msg_id', 'new_count', 'new_count_total', 'old_count', 'old_count_total', 'score'], [Types.STRING(), Types.INT(), Types.INT(), Types.INT(), Types.INT(), Types.STRING()])
type_info = Types.ROW_NAMED(["msg_id", "hostname", "count"], [Types.STRING(), Types.STRING(), Types.INT()])
json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build()
logging.info("Row info: %s", json_row_schema)
# define the kafka connection properties.
kafka_props = {'bootstrap.servers': 'kafka-cp-kafka-headless:9092'}
# create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties.
kafka_consumer = FlinkKafkaConsumer("prometheus-output", json_row_schema, kafka_props)
kafka_producer = FlinkKafkaProducer("prometheus-sink", SimpleStringSchema(), kafka_props)
# set the kafka source to consume data from earliest offset.
kafka_consumer.set_start_from_earliest()
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()
# create a DataStream from kafka consumer source
ds = env.add_source(kafka_consumer)
ds.map(lambda x: json.dumps({"msgId": x[0], "hostname": x[1], "count": x[2]}), output_type=Types.STRING()) \
.add_sink(kafka_producer)
#ds.key_by(lambda x: x[0], key_type_info=Types.STRING()) \
# .process(MyProcessFunction(), output_type=Types.TUPLE([Types.STRING(), Types.INT(), Types.LONG()])) \
# .add_sink(kafka_producer)
env.execute("twitter_anomaly_detection")
class MyProcessFunction(KeyedProcessFunction):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
result = "{'msgId': {}, 'count': {}, 'timestamp': {}}".format(str(ctx.get_current_key, str(value[2]), str(ctx.timestamp())))
yield json_to_tuple(result, ['msgId', 'count', 'timestamp'])
current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(current_watermark)
anomaly_detect_ts(ctx.timestamp, max_anoms=0.02, direction="both", plot=True)
#def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
# yield "On timer timestamp: " + str(timestamp)
class KafkaRowTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value: Any, record_timestamp: int) -> int:
return int(value[0])
class MyMapFunction(MapFunction):
def map(self, value):
return value[1]
class MyKeySelector(KeySelector):
def get_key(self, value):
return value[0]
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
anomaly_detection()
Does the job runs in detached mode or attached mode? Could you share some code snippets and the job submission command if possible?
Regards, Dian
Dian,
Thanks for your reply. Yes, I would submit the same job in kubernetes session mode. Sometimes the job would succeed but successive tries would fail. No stack trace, the job would never return a job id:
In this case I redeployed the cluster and the job completed ... and multiple tries were successful.
Hi Robert,
1) Do you mean that when submitting the same job multiple times and it succeed sometimes and hangs sometimes or it only hangs for some specific job? 2) Which deployment mode do you use? 3) Is it possible to dump the stack trace? It would help us understanding what’s happening.
Thanks, Dian
Thanks All,
I've added python and pyflink to the TM image which fixed the problem. Now however submitting a python script to the cluster successfully is sporadic; sometimes it completes but most of the time it just hangs. Not sure what is causing this.
Hi, From the error message, I think the problem is no python interpreter on your TaskManager machine. You need to install a python 3.5+ interpreter on the TM machine, and this python environment needs to install pyflink (pip install apache-flink). For details, you can refer to the document[1].
Okay, I added the jars and fixed that exception. However I have a new exception that is harder to decipher:
2021-03-15 14:46:20
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:181)
at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:141)
at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:181)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:340)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:259)
at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:512)
at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:522)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:262)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:123)
at org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:198)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: error=2, No such file or directory
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
at java.lang.ProcessImpl.start(ProcessImpl.java:134)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
... 20 more
On Mon, Mar 15, 2021 at 10:49 AM Robert Metzger < [hidden email]> wrote: Hey, are you sure the class is in the lib/ folder of all machines / instances, and you've restarted Flink after adding the files to lib/ ?
Shuiqiang, I added the flink-connector-kafka_2-12 jar to the /opt/flink/lib directory When submitting this job to my flink cluster I’m getting this stack trace at runtime:
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.GeneratedMethodAccessor71.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:614)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:63)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:63)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:49)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:310)
... 9 more
On Sat, Mar 13, 2021 at 12:13 AM Shuiqiang Chen < [hidden email]> wrote:
Shuiqiang, Can you include the import statements? thanks.
On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen < [hidden email]> wrote: Hi Robert,
Kafka Connector is provided in Python DataStream API since release-1.12.0. And the documentation for it is lacking, we will make it up soon.
The following code shows how to apply KafkaConsumers and KafkaProducer: ``` env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) env.set_stream_time_characteristic(TimeCharacteristic.EventTime) # define the schema of the message from kafka, here the data is in json format. type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'], [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(), Types.INT()]) json_row_schema = JsonRowDeserializationSchema.builder().type_info(type_info).build() # define the kafka connection properties. kafka_props = {'bootstrap.servers': 'localhost:9092', ' group.id': 'pyflink-e2e-source'} # create the KafkaConsumer and KafkaProducer with the specified topic name, serialization/deserialization schema and properties. kafka_consumer = FlinkKafkaConsumer("timer-stream-source", json_row_schema, kafka_props) kafka_producer = FlinkKafkaProducer("timer-stream-sink", SimpleStringSchema(), kafka_props) # set the kafka source to consume data from earliest offset. kafka_consumer.set_start_from_earliest()
# create a DataStream from kafka consumer source ds = env.add_source(kafka_consumer)
result_stream = ...
# write the result into kafka by a kafka producer sink.
result_stream.add_sink(kafka_producer) ```
Best, Shuiqiang
I’ve scoured the web looking for an example of using a Kafka source for a DataStream in python. Can someone finish this example?
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection( KAFKA_SOURCE )
...
-- Robert Cullen 240-475-4490
--
Robert Cullen 240-475-4490
--
Robert Cullen 240-475-4490
--
Robert Cullen 240-475-4490
-- Robert Cullen 240-475-4490
-- Robert Cullen 240-475-4490
--
Robert Cullen 240-475-4490
--
Robert Cullen 240-475-4490
|