Hi,
I have 2 AsyncFunctions SampleCopyAsyncFunction and SampleSinkAsyncFunction called with AsyncDataStream.unorderedWait. The 1st AsyncDataStream.unorderedWait’s SampleCopyAsyncFunction .asyncInvoke gets called properly but the 2nd SampleSinkAsyncFunction.asyncInvoke never gets called(though open and close functions are called). Is there any way for me to have the 2nd asyncInvoke get called ? I have an Executors.newFixedThreadPool(..) that I use within each AsyncFunction.
TIA
Here is the code:
AsyncFunction<CameraWithCube, CameraWithCube> cameraWithCubeAsyncFunction = new SampleCopyAsyncFunction(shutdownWaitTS, inputFile, options, nThreads); DataStream<CameraWithCube> cameraWithCubeDataStreamAsync = AsyncDataStream.unorderedWait(keyedByCamCameraStream, cameraWithCubeAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity) .setParallelism(parallelCamTasks);//.startNewChain() DataStream<CameraWithCube> cameraWithCubeDataStream = cameraWithCubeDataStreamAsync.keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? cameraWithCube.cameraKey.getTs() : new Object()); String uuid = UUID.randomUUID().toString(); DataStream<Tuple2<InputMetadata, CameraWithCube>> enrichedCameraFeed = inputMetadataDataStream .connect(cameraWithCubeDataStream) .flatMap(new SyncLatchFunction(outputFile, outputPath, uuid)) .uid("connect2Streams") .setParallelism(1); AsyncFunction<Tuple2<InputMetadata, CameraWithCube>, Tuple2<InputMetadata, CameraWithCube>> cubeSinkAsyncFunction = new SampleSinkAsyncFunction(shutdownWaitTS, outputPath, options, nThreads, uuid); DataStream<Tuple2<InputMetadata, CameraWithCube>> enrichedCameraFeedSinkAsync = AsyncDataStream.unorderedWait(enrichedCameraFeed, cubeSinkAsyncFunction, timeout, TimeUnit.MILLISECONDS, nCapacity) .setParallelism(parallelCubeTasks) .uid("Read-Image-Async");//ç== asyncInvoke never gets called for 2nd AsyncFunction DataStream<Tuple2<InputMetadata, CameraWithCube>> enrichedCameraFeedSinkAsyncDataStream = enrichedCameraFeedSinkAsync.keyBy((tuple2) -> tuple2.f0.inputMetadataKey != null ? tuple2.f0.inputMetadataKey.getTs() : new Object()); //enrichedCameraFeedSinkAsyncDataStream.print();ç this doesn’t work enrichedCameraFeedSinkAsyncDataStream.addSink(new CubeProcessingSink(options, outputPath, uuid)) //, shutdownWaitTS .setParallelism(parallelCubeTasks) .uid("Cube-Sink"); |
Hi, Turns out the issue was with the RichParallelSourceFunction I was using was resulting in the Sink not getting called after the connect SyncLatchFunction. Need to figure out the issue there but the 2 asyncInvoke functions work fine now after I replaced the ParallelCameraSource (RichParallelSourceFunction) with the old CheckpointedCameraWithCubeSource.
DataStream<CameraWithCube> keyedByCamCameraStream = env
Vijay On Thu, Jul 26, 2018 at 10:39 PM Vijay Balakrishnan <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |