AsyncFunction used twice with Asyncdatastream.unorderedWait - 2nd function's asyncInvoke not getting called

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

AsyncFunction used twice with Asyncdatastream.unorderedWait - 2nd function's asyncInvoke not getting called

Vijay Balakrishnan
Hi,

I have 2 AsyncFunctions SampleCopyAsyncFunction and SampleSinkAsyncFunction called with AsyncDataStream.unorderedWait. The 1st AsyncDataStream.unorderedWait’s SampleCopyAsyncFunction .asyncInvoke gets called properly but the 2nd SampleSinkAsyncFunction.asyncInvoke never gets called(though open and close functions are called). Is there any way for me to have the 2nd asyncInvoke get called ? I have an Executors.newFixedThreadPool(..) that I use within each AsyncFunction.

 


TIA

 

 

Here is the code:

 

AsyncFunction<CameraWithCube, CameraWithCube> cameraWithCubeAsyncFunction =

                new SampleCopyAsyncFunction(shutdownWaitTS, inputFile, options, nThreads);

        DataStream<CameraWithCube> cameraWithCubeDataStreamAsync =

                AsyncDataStream.unorderedWait(keyedByCamCameraStream, cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)

                        .setParallelism(parallelCamTasks);//.startNewChain()

        DataStream<CameraWithCube> cameraWithCubeDataStream = cameraWithCubeDataStreamAsync.keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ?

                cameraWithCube.cameraKey.getTs() : new Object());

        String uuid = UUID.randomUUID().toString();

        DataStream<Tuple2<InputMetadata, CameraWithCube>> enrichedCameraFeed = inputMetadataDataStream

                .connect(cameraWithCubeDataStream)

                .flatMap(new SyncLatchFunction(outputFile, outputPath, uuid))

                .uid("connect2Streams")

                .setParallelism(1);

        AsyncFunction<Tuple2<InputMetadata, CameraWithCube>, Tuple2<InputMetadata, CameraWithCube>> cubeSinkAsyncFunction =

                new SampleSinkAsyncFunction(shutdownWaitTS, outputPath, options, nThreads, uuid);

        DataStream<Tuple2<InputMetadata, CameraWithCube>> enrichedCameraFeedSinkAsync =

                AsyncDataStream.unorderedWait(enrichedCameraFeed, cubeSinkAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)

                        .setParallelism(parallelCubeTasks)

                        .uid("Read-Image-Async");//ç== asyncInvoke never gets called for 2nd AsyncFunction

        DataStream<Tuple2<InputMetadata, CameraWithCube>> enrichedCameraFeedSinkAsyncDataStream = enrichedCameraFeedSinkAsync.keyBy((tuple2) -> tuple2.f0.inputMetadataKey != null ?

                tuple2.f0.inputMetadataKey.getTs() : new Object());

        //enrichedCameraFeedSinkAsyncDataStream.print();ç this doesn’t work

        enrichedCameraFeedSinkAsyncDataStream.addSink(new CubeProcessingSink(options, outputPath, uuid)) //, shutdownWaitTS

                .setParallelism(parallelCubeTasks)

                .uid("Cube-Sink");

Reply | Threaded
Open this post in threaded view
|

Re: AsyncFunction used twice with Asyncdatastream.unorderedWait - 2nd function's asyncInvoke not getting called

Vijay Balakrishnan
Hi,

Turns out the issue was with the RichParallelSourceFunction I was using was resulting in the Sink not getting called after the connect SyncLatchFunction. Need to figure out the issue there but the 2 asyncInvoke functions work fine now after I replaced the ParallelCameraSource (RichParallelSourceFunction) with the old CheckpointedCameraWithCubeSource.

 

 

DataStream<CameraWithCube> keyedByCamCameraStream = env
        .addSource(new CheckpointedCameraWithCubeSource(maxSeqCnt, servingSpeedMs, startTime, nbrCameras, outputFile), "TileDB Camera")
        .uid("TileDB-Camera")
        .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ?
                cameraWithCube.cameraKey.getCam() : new Object())
        .process(new ProcessFunction<CameraWithCube, CameraWithCube>() {
            @Override
            public void processElement(CameraWithCube value, Context ctx, Collector<CameraWithCube> out) throws Exception {
                out.collect(value);
            }
        })
        .setParallelism(parallelCamTasks);
/*DataStream<CameraWithCube> keyedByCamCameraStream = env
        .addSource(new ParallelCameraSource(maxSeqCnt, servingSpeedMs, startTime, nbrCameras, outputFile), "TileDB Camera")
        .uid("TileDB-Camera")
        .setParallelism(parallelCamTasks)
        .partitionCustom((Partitioner<Integer>) (key, numPartitions) -> {
            return key % numPartitions;
        }, new KeySelector<CameraWithCube, Integer>() {
            @Override
            public Integer getKey(CameraWithCube cameraWithCube) throws Exception {
               ....;
            }
        });*/

 

Vijay


On Thu, Jul 26, 2018 at 10:39 PM Vijay Balakrishnan <[hidden email]> wrote:
Hi,

I have 2 AsyncFunctions SampleCopyAsyncFunction and SampleSinkAsyncFunction called with AsyncDataStream.unorderedWait. The 1st AsyncDataStream.unorderedWait’s SampleCopyAsyncFunction .asyncInvoke gets called properly but the 2nd SampleSinkAsyncFunction.asyncInvoke never gets called(though open and close functions are called). Is there any way for me to have the 2nd asyncInvoke get called ? I have an Executors.newFixedThreadPool(..) that I use within each AsyncFunction.

 


TIA

 

 

Here is the code:

 

AsyncFunction<CameraWithCube, CameraWithCube> cameraWithCubeAsyncFunction =

                new SampleCopyAsyncFunction(shutdownWaitTS, inputFile, options, nThreads);

        DataStream<CameraWithCube> cameraWithCubeDataStreamAsync =

                AsyncDataStream.unorderedWait(keyedByCamCameraStream, cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)

                        .setParallelism(parallelCamTasks);//.startNewChain()

        DataStream<CameraWithCube> cameraWithCubeDataStream = cameraWithCubeDataStreamAsync.keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ?

                cameraWithCube.cameraKey.getTs() : new Object());

        String uuid = UUID.randomUUID().toString();

        DataStream<Tuple2<InputMetadata, CameraWithCube>> enrichedCameraFeed = inputMetadataDataStream

                .connect(cameraWithCubeDataStream)

                .flatMap(new SyncLatchFunction(outputFile, outputPath, uuid))

                .uid("connect2Streams")

                .setParallelism(1);

        AsyncFunction<Tuple2<InputMetadata, CameraWithCube>, Tuple2<InputMetadata, CameraWithCube>> cubeSinkAsyncFunction =

                new SampleSinkAsyncFunction(shutdownWaitTS, outputPath, options, nThreads, uuid);

        DataStream<Tuple2<InputMetadata, CameraWithCube>> enrichedCameraFeedSinkAsync =

                AsyncDataStream.unorderedWait(enrichedCameraFeed, cubeSinkAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity)

                        .setParallelism(parallelCubeTasks)

                        .uid("Read-Image-Async");//ç== asyncInvoke never gets called for 2nd AsyncFunction

        DataStream<Tuple2<InputMetadata, CameraWithCube>> enrichedCameraFeedSinkAsyncDataStream = enrichedCameraFeedSinkAsync.keyBy((tuple2) -> tuple2.f0.inputMetadataKey != null ?

                tuple2.f0.inputMetadataKey.getTs() : new Object());

        //enrichedCameraFeedSinkAsyncDataStream.print();ç this doesn’t work

        enrichedCameraFeedSinkAsyncDataStream.addSink(new CubeProcessingSink(options, outputPath, uuid)) //, shutdownWaitTS

                .setParallelism(parallelCubeTasks)

                .uid("Cube-Sink");