Hi, Newbie question - What I am trying to do is the following: CameraWithCubeSource source sends data-containing tuples of (cameraNbr,TS). 1. Need to partition data by cameraNbr. 2. Then sleep for 1 sec to simulate a heavy process in the task. 3. Then need to partition data by TS and finally get the DataStream to connect with another DataStream. DataStream<CameraWithCube> cameraWithCubeDataStream = env .addSource(new CameraWithCubeSource(cameraFile, delay, servingSpeedFactor)) .setParallelism(parallelTasks) .setMaxParallelism(parallelTasks) .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? //partition by cameraNbr cameraWithCube.cameraKey.getCam() : new Object()); //sleep for 1 sec ???? how ((KeyedStream) cameraWithCubeDataStream).timeWindow(Time.seconds(1)) .apply(new WindowFunction<CameraWithCube, CameraWithCube, String, TimeWindow>() { @Override public void apply(String cameraKeyCam, TimeWindow timeWindow, Iterable<CameraWithCube> cameraWithCubesAssignedToWindow, Collector<CameraWithCube> collector) throws Exception { Thread.sleep(1000); cameraWithCubesAssignedToWindow.forEach(cameraWithCube -> collector.collect(cameraWithCube)); } })//returning void here from apply ?? //partition by TS and return DataStream .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? //partition by cameraNbr cameraWithCube.cameraKey.getTS() : new Object()); ; TIA, Vijay |
Just some advice - do not use sleep to simulate a heavy task. Use real data or generated data to simulate. This sleep is garbage from a software quality point of view. Furthermore, it is often forgotten etc.
|
Hi, This worked out after looking at https://stackoverflow.com/questions/44436401/some-puzzles-for-the-operator-parallelism-in-flink?rq=1 Why cannot I use setParallelism after keyBy-is it not an operator ? DataStream<CameraWithCube> cameraWithCubeDataStream = env TIA, Vijay On Wed, May 16, 2018 at 1:41 PM Jörn Franke <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |