Hi,
I am trying to submit a pyFlink job in detached mode using the command: ../../flink-1.11.0/bin/flink run -d -py basic_streaming_job.py -j flink-sql-connector-kafka_2.11-1.11.0.jar The jobs are submitted successfully but the command does not return. I realized that was because I had the following line in basic_streaming_job.py: ten_sec_summaries.get_job_client().get_job_execution_result().result() This statement is useful when testing this locally within a minicluster (using python basic_streaming_job.py) but not needed when the job is submitted to a cluster. So I would like to programmatically detect if the StreamExecutionEnvironment is a localStreamEnvironment and execute the above snippet accordingly. How do I do this? Thanks, Manas |
Hi Manas, When running locally, you need `ten_sec_summaries.get_job_client().get_job_execution_result().result()` to wait job finished. However, when you submit to the cluster, you need to delete this code. In my opinion, the current feasible solution is that you prepare two sets of codes for this, although this is annoying. After all, running jobs locally is usually for testing, so it should be acceptable to prepare different codes. In the long run, it should be the flink framework that makes different behaviors according to different environments so that users don’t need to prepare different codes. Best, Xingbo Manas Kale <[hidden email]> 于2020年9月1日周二 下午3:00写道:
|
Hi Xingbo, Thank you for clarifying that. I am indeed maintaining a different version of the code by commenting those lines, but I was just wondering if it was possible to detect the environment programmatically. Regards, Manas On Wed, Sep 2, 2020 at 7:32 AM Xingbo Huang <[hidden email]> wrote:
|
Hi Manas, I am not entirely sure but you might try to check whether env._j_stream_execution_environment is an instance of gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment via Python's isinstance function. Cheers, Till On Wed, Sep 2, 2020 at 5:46 AM Manas Kale <[hidden email]> wrote:
|
Hi Manas, As Till said, you need to check whether the execution environment used is LocalStreamEnvironment. You need to get the class object corresponding to the corresponding java object through py4j. You can take a look at the example I wrote below, I hope it will help you ``` from pyflink.table import EnvironmentSettings, StreamTableEnvironment from pyflink.datastream import StreamExecutionEnvironment from pyflink.java_gateway import get_gateway from py4j.java_gateway import get_java_class def test(): env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create( env, environment_settings=EnvironmentSettings.new_instance() .in_streaming_mode().use_blink_planner().build()) gateway = get_gateway() # get the execution environment class env_class = table_env._j_tenv.getPlanner().getExecEnv().getClass() # get the LocalStreamEnvironment class local_stream_environment_class = get_java_class( gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment) print(env_class == local_stream_environment_class) if __name__ == '__main__': test() ``` Best, Xingbo Till Rohrmann <[hidden email]> 于2020年9月2日周三 下午5:03写道:
|
Hi Xingbo and Till, Thank you for your help! On Wed, Sep 2, 2020 at 9:38 PM Xingbo Huang <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |