How to assign windows dynamically after process() operator

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How to assign windows dynamically after process() operator

cancobanoglu
hi all,

I m trying to do a streaming process like below,
 
  1. collect sensor events from a source
  2. collect rule events defined for a device (which streams sensor events)
  3. rules may have been defined with window information for aggregation
processes differently for any device
  4. when a rule for a device with a window info seen in stream then create
a window (tumbling)
  5. if a new rule comes without window info, remove window and process
without window function.

I took this as a reference :
https://techblog.king.com/rbea-scalable-real-time-analytics-king/

*my streaming code as below;*

mappedDataSource
            .connect(mappedRuleStream)
            .keyBy(..deviceId..)
            .process(new RuleProcessorFunction())
            .windowAll(new CustomTimeWindowing())
            .apply(new AllWindowFunction<ProcessedEvent, Object,
TimeWindow>() {

              @Override
              public void apply(TimeWindow window, Iterable<ProcessedEvent>
values, Collector out) throws Exception {
                System.out.println("hello");
              }
            });

*RuleProcessorFunction is *;

public class RuleProcessorFunction extends CoProcessFunction<SensorEvent,
RuleEvent, ProcessedEvent> {

  private transient ValueState<Tuple2&lt;SensorEvent, RuleEvent>> state;

  @Override
  public void processElement1(SensorEvent value, Context ctx,
Collector<ProcessedEvent> out) throws Exception {
    System.out.println("process element device id : " + value.deviceId);
    System.out.println("process element solution id : " + value.solutionId);
    state.update(Tuple2.of(value, null));

    RuleEvent rule = state.value().f1;

    // execute if there is a defined rule on incoming event
  }

  @Override
  public void processElement2(RuleEvent value, Context ctx,
Collector<ProcessedEvent> out) throws Exception {
    System.out.println("rule stream element solId :" + value.solutionId + "
devId : " + value.deviceId);
    state.value().f1 = value;
    // store rule in memory
    // processed event is gonna be stored window information and downstream
is window assignment
    ProcessedEvent processedEvent = new ProcessedEvent();
    processedEvent.deviceId = value.deviceId;
    processedEvent.solutionId = value.solutionId;
    processedEvent.windowInfo = value.window;
    processedEvent.ruleId = value.ruleId;

    out.collect(processedEvent);

  }

  @Override
  public void open(Configuration parameters) throws Exception {
    ValueStateDescriptor<Tuple2&lt;SensorEvent, RuleEvent>> stateDescriptor
=
            new ValueStateDescriptor<>("processor", TypeInformation.of(new
TypeHint<Tuple2&lt;SensorEvent, RuleEvent>>() {
            }));
    state = getRuntimeContext().getState(stateDescriptor);

  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx,
Collector<ProcessedEvent> out) throws Exception {
    // rule triggers
  }
}


*CustomWindowAssigner is ;*

public class CustomTimeWindowing extends TumblingEventTimeWindows {
  public CustomTimeWindowing() {
    super(1, 0);
  }

  @Override
  public Collection<TimeWindow> assignWindows(Object element, long
timestamp, WindowAssignerContext context) {
    System.out.println("creating window : ");
    io.iven.stream.processing.ProcessedEvent processedEvent =
(ProcessedEvent) element;
    int windowInfo = processedEvent.windowInfo;
    System.out.println("creating window  rule : " + processedEvent.ruleId);
    long size = windowInfo * 1000;
    System.out.println("window info in milisecond :" + size);
    long start = timestamp - (timestamp % size);
    long end = start + size;
    return Collections.singletonList(new TimeWindow(start, end));
  }
}

When a ruleEvent comes i'm adding metadata about window info and add into
the collector to keep streaming. but if i do this in processElement1 for
SensorEvent, then windowAssigner is gonna be called again and window will be
changed. I want it to enter when a new/changed window info comes.
 
Could you guide me to do this ? What is the correct way to create this kind
of structure ? Managing windows manually or using this kind of custom window
assigners ?

another ref :
https://stackoverflow.com/questions/34596230/differences-between-working-with-states-and-windowstime-in-flink-streaming

Thanks







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/