scala cep with event time

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

scala cep with event time

孙森
Hi,Fabian

      I am using flink CEP library with event time, but there is no output( the java code performed as expected, but scala did not) .My code is here:

object EventTimeTest extends App {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val input: DataStream[Event] = env.fromElements(new Event(1, "aa", DateUtils.dt2timestamp("2018-05-14 10:29:15.000000")),
new Event(1, "ab", DateUtils.dt2timestamp("2018-05-14 10:29:25.000000")),
new Event(3, "ac", DateUtils.dt2timestamp("2018-05-14 10:29:35.000000")),
new Event(4, "ad", DateUtils.dt2timestamp("2018-05-14 10:29:45.000000")),
new Event(5, "ae", DateUtils.dt2timestamp("2018-05-14 10:29:55.000000")))


input.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[Event](Time.seconds(100)) {
override def extractTimestamp(element: Event): Long = {
val ts = DateUtils.dt2long(element.getTs)
println(ts)
ts
}
}).setParallelism(1)

val partitionedInput: KeyedStream[Event, Long] = input.keyBy(event => event.getId)

val pattern: Pattern[Event, Event] = Pattern.begin("start")
.subtype(classOf[Event])
.where(_.getName.startsWith("a")).within(Time.seconds(30))

val patternStream = CEP.pattern(partitionedInput, pattern)

val alerts = patternStream.select(patternSelectFun => {
val startEvent: Event = patternSelectFun("start").head
println(startEvent.getName)
startEvent.getName
})


alerts.print()

env.execute("start")

}



The java code is :



public class EventTest {

    public static void main(String[] s) {

        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        DataStream<Event> input = env.fromElements(
                new Event(1, "aa", "2018-05-14 10:29:15"),
                new Event(1, "ab", "2018-05-14 10:29:25"),
                new Event(3, "ac", "2018-05-14 10:29:35"),
                new Event(4, "ad", "2018-05-14 10:29:45"),
                new Event(5, "ae", "2018-05-14 10:29:55"));


        DataStream<Event> withTimestampsAndWatermarks =
                input.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(Event element) {
                        try {
                            Date dt = formatter.parse(element.umsTs);
                            return dt.getTime();
                        } catch (ParseException e) {
                            e.printStackTrace();
                            return 0;
                        }
                    }
                });

        KeyedStream<Event, Long> partitionedInput = withTimestampsAndWatermarks.keyBy(new KeySelector<Event, Long>() {
            public Long getKey(Event e) {
                return e.id;
            }
        });

        Pattern<Event, Event> pattern = Pattern.<Event>begin("start")
                .subtype(Event.class)
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) {
                        return event.name.startsWith("a");
                    }
                }).within(Time.seconds(30));

        PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);

        DataStream<List<Event>> alerts = patternStream.select(
                new PatternSelectFunction<Event, List<Event>>() {
                    @Override
                    public List<Event> select(Map<String, List<Event>> pattern) {
                        List<Event> startEvent = pattern.get("start");
                        System.out.println("name:"+startEvent.get(0).name);
                        return startEvent;
                    }
                }
        );
        alerts.print();

        try {
            env.execute("start");
        } catch (Exception e) {
            e.printStackTrace();
        }


    }


}




Thanks!
sensun