Pyflink tutorial output

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Pyflink tutorial output

Robert Cullen

I’m running this script taken from the Flink website: tutorial.py

python tutorial.py
from pyflink.common.serialization import SimpleStringEncoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink

def tutorial():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    ds = env.from_collection(
        collection=[(1, 'aaa'), (2, 'bbb')],
        type_info=Types.ROW([Types.INT(), Types.STRING()]))
    ds.add_sink(StreamingFileSink
                .for_row_format('/tmp/output', SimpleStringEncoder())
                .build())
    env.execute("tutorial_job")

if __name__ == '__main__':
    tutorial()

It correctly outputs a part file to the /tmp/output directory when I run it locally. However when I run this on my kubernetes session cluster there is no output. Any ideas?

./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
--pyModule tutorial \          
--pyFiles /opt/flink-1.12.0/examples/tutorial.py \          
--detached
--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Pyflink tutorial output

Shuiqiang Chen
Hi Robert,

Have you tried exploring the /tmp/output directory in the task manager pods on you kubernetes cluster? The StreamingFileSink will create the output directory on the host of task manager in which the sink tasks are executed.

Best,
Shuiqiang

Robert Cullen <[hidden email]> 于2021年3月24日周三 上午2:48写道:

I’m running this script taken from the Flink website: tutorial.py

python tutorial.py
from pyflink.common.serialization import SimpleStringEncoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink

def tutorial():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    ds = env.from_collection(
        collection=[(1, 'aaa'), (2, 'bbb')],
        type_info=Types.ROW([Types.INT(), Types.STRING()]))
    ds.add_sink(StreamingFileSink
                .for_row_format('/tmp/output', SimpleStringEncoder())
                .build())
    env.execute("tutorial_job")

if __name__ == '__main__':
    tutorial()

It correctly outputs a part file to the /tmp/output directory when I run it locally. However when I run this on my kubernetes session cluster there is no output. Any ideas?

./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
--pyModule tutorial \          
--pyFiles /opt/flink-1.12.0/examples/tutorial.py \          
--detached
--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Pyflink tutorial output

Dian Fu
In reply to this post by Robert Cullen
How did you check the output when submitting to the kubernetes session cluster? I ask this because the output should be written to the local directory “/tmp/output” on the TaskManagers where the jobs are running on. 

Regards,
Dian

2021年3月24日 上午2:40,Robert Cullen <[hidden email]> 写道:

I’m running this script taken from the Flink website: tutorial.py

python tutorial.py
from pyflink.common.serialization import SimpleStringEncoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink

def tutorial():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    ds = env.from_collection(
        collection=[(1, 'aaa'), (2, 'bbb')],
        type_info=Types.ROW([Types.INT(), Types.STRING()]))
    ds.add_sink(StreamingFileSink
                .for_row_format('/tmp/output', SimpleStringEncoder())
                .build())
    env.execute("tutorial_job")

if __name__ == '__main__':
    tutorial()

It correctly outputs a part file to the /tmp/output directory when I run it locally. However when I run this on my kubernetes session cluster there is no output. Any ideas?

./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
--pyModule tutorial \          
--pyFiles /opt/flink-1.12.0/examples/tutorial.py \          
--detached
--
Robert Cullen
240-475-4490

Reply | Threaded
Open this post in threaded view
|

Re: Pyflink tutorial output

Robert Cullen
Ah, there they are.  Thanks!

On Tue, Mar 23, 2021 at 10:26 PM Dian Fu <[hidden email]> wrote:
How did you check the output when submitting to the kubernetes session cluster? I ask this because the output should be written to the local directory “/tmp/output” on the TaskManagers where the jobs are running on. 

Regards,
Dian

2021年3月24日 上午2:40,Robert Cullen <[hidden email]> 写道:

I’m running this script taken from the Flink website: tutorial.py

python tutorial.py
from pyflink.common.serialization import SimpleStringEncoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink

def tutorial():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    ds = env.from_collection(
        collection=[(1, 'aaa'), (2, 'bbb')],
        type_info=Types.ROW([Types.INT(), Types.STRING()]))
    ds.add_sink(StreamingFileSink
                .for_row_format('/tmp/output', SimpleStringEncoder())
                .build())
    env.execute("tutorial_job")

if __name__ == '__main__':
    tutorial()

It correctly outputs a part file to the /tmp/output directory when I run it locally. However when I run this on my kubernetes session cluster there is no output. Any ideas?

./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=flink-jobmanager -Dkubernetes.namespace=cmdaa \
--pyModule tutorial \          
--pyFiles /opt/flink-1.12.0/examples/tutorial.py \          
--detached
--
Robert Cullen
240-475-4490



--
Robert Cullen
240-475-4490