hi all,
I m trying to do a streaming process like below, 1. collect sensor events from a source 2. collect rule events defined for a device (which streams sensor events) 3. rules may have been defined with window information for aggregation processes differently for any device 4. when a rule for a device with a window info seen in stream then create a window (tumbling) 5. if a new rule comes without window info, remove window and process without window function. I took this as a reference : https://techblog.king.com/rbea-scalable-real-time-analytics-king/ *my streaming code as below;* mappedDataSource .connect(mappedRuleStream) .keyBy(..deviceId..) .process(new RuleProcessorFunction()) .windowAll(new CustomTimeWindowing()) .apply(new AllWindowFunction<ProcessedEvent, Object, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<ProcessedEvent> values, Collector out) throws Exception { System.out.println("hello"); } }); *RuleProcessorFunction is *; public class RuleProcessorFunction extends CoProcessFunction<SensorEvent, RuleEvent, ProcessedEvent> { private transient ValueState<Tuple2<SensorEvent, RuleEvent>> state; @Override public void processElement1(SensorEvent value, Context ctx, Collector<ProcessedEvent> out) throws Exception { System.out.println("process element device id : " + value.deviceId); System.out.println("process element solution id : " + value.solutionId); state.update(Tuple2.of(value, null)); RuleEvent rule = state.value().f1; // execute if there is a defined rule on incoming event } @Override public void processElement2(RuleEvent value, Context ctx, Collector<ProcessedEvent> out) throws Exception { System.out.println("rule stream element solId :" + value.solutionId + " devId : " + value.deviceId); state.value().f1 = value; // store rule in memory // processed event is gonna be stored window information and downstream is window assignment ProcessedEvent processedEvent = new ProcessedEvent(); processedEvent.deviceId = value.deviceId; processedEvent.solutionId = value.solutionId; processedEvent.windowInfo = value.window; processedEvent.ruleId = value.ruleId; out.collect(processedEvent); } @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<Tuple2<SensorEvent, RuleEvent>> stateDescriptor = new ValueStateDescriptor<>("processor", TypeInformation.of(new TypeHint<Tuple2<SensorEvent, RuleEvent>>() { })); state = getRuntimeContext().getState(stateDescriptor); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<ProcessedEvent> out) throws Exception { // rule triggers } } *CustomWindowAssigner is ;* public class CustomTimeWindowing extends TumblingEventTimeWindows { public CustomTimeWindowing() { super(1, 0); } @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { System.out.println("creating window : "); io.iven.stream.processing.ProcessedEvent processedEvent = (ProcessedEvent) element; int windowInfo = processedEvent.windowInfo; System.out.println("creating window rule : " + processedEvent.ruleId); long size = windowInfo * 1000; System.out.println("window info in milisecond :" + size); long start = timestamp - (timestamp % size); long end = start + size; return Collections.singletonList(new TimeWindow(start, end)); } } When a ruleEvent comes i'm adding metadata about window info and add into the collector to keep streaming. but if i do this in processElement1 for SensorEvent, then windowAssigner is gonna be called again and window will be changed. I want it to enter when a new/changed window info comes. Could you guide me to do this ? What is the correct way to create this kind of structure ? Managing windows manually or using this kind of custom window assigners ? another ref : https://stackoverflow.com/questions/34596230/differences-between-working-with-states-and-windowstime-in-flink-streaming Thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |