Matching largest event pattern without duplicates

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Matching largest event pattern without duplicates

James Buchan
Hey all,

I'm trying to complete a small POC to see if Flink is suitable for our needs and the first step is to evaluate a stream of events and continually output the largest active group that does not contain duplicates.  I'm attempting to do this with the CEP pattern matching.

For example, for the following input:

>a
>a
>b
>c
>a
>c

I would expect an output of:

a
a
a:b
a:b:c
b:c:a
a:c

The closest I've been able to get is which returns:

a
a
a:b
a:b:c
b:c:a
b:c
b
c:a
a:c
a
c

When the initial pattern continues to grow it looks good, but as soon as duplicate is seen I receive more results than I would like.  This example uses the skipToFirst strategy; I thought others would be more helpful but ended up with less desirable results.

This feels like it should be easily solvable but I've not been able to find the right combination of options to get it working.  Any assistance would be appreciated.

Here's the details of my latest method:

public static void cep() throws Exception {
log.info("Initializing cep processor");

String inputTopic = "inputTopic";
String outputTopic = "outputTopic";
String consumerGroup = "testGroup";
String address = "localhost:9092";

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

log.info("Creating consumer");
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
inputTopic, address, consumerGroup);
flinkKafkaConsumer.setStartFromLatest();

log.info("Creating producer");
FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address);

log.info("Configuring sources");
DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);

log.info("Processing kafka messages");
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("start");
Pattern<String, ?> pattern = Pattern.<String>begin("start", skipStrategy)
.oneOrMore()
.until(new IterativeCondition<>() {
@Override
public boolean filter(String s, Context<String> context) throws Exception {
return StreamSupport.stream(context.getEventsForPattern("start").spliterator(), false)
.anyMatch(state -> state.equals(s));
}
});

PatternStream<String> patternStream = CEP.pattern(stringInputStream, pattern);
DataStream<String> result = patternStream.select(
(PatternSelectFunction<String, String>) map ->
String.format("Evaluated these states %s", String.join(":", map.get("start")))
);
result.addSink(flinkKafkaProducer);

environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
environment.execute("Flink cep Example");
}


Thanks!

-James
Reply | Threaded
Open this post in threaded view
|

Re: Between Flink 1.9 and 1.11 - any behavior change for web.upload.dir

Avijit Saha
Hello,

Has there been any change in behavior related to the "web.upload.dir" behavior between Flink 1.9 and 1.11?

I have a failure case where when build an image using "flink:1.11.0-scala_2.12" in Dockerfile, the job manager job submissions fail with the following Exception but the same flow works fine (for the same underlying Code image) when using "flink:1.9.1-scala_2.12"..............

This is the Exception stack trace for 1.11 and not seen using 1.9:
------------------------------------------------------------------------------------------
Caused by: java.nio.file.FileAlreadyExistsException: /opt/flink/flink-web-upload
        at sun.nio.fs.UnixException.translateToIOException(UnixException.java:88) ~[?:1.8.0_262]
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) ~[?:1.8.0_262]
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) ~[?:1.8.0_262]
        at sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:384) ~[?:1.8.0_262]
        at java.nio.file.Files.createDirectory(Files.java:674) ~[?:1.8.0_262]
        at java.nio.file.Files.createAndCheckIsDirectory(Files.java:781) ~[?:1.8.0_262]
        at java.nio.file.Files.createDirectories(Files.java:727) ~[?:1.8.0_262]
        at org.apache.flink.runtime.rest.RestServerEndpoint.checkAndCreateUploadDir(RestServerEndpoint.java:478) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.rest.RestServerEndpoint.createUploadDir(RestServerEndpoint.java:462) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.rest.RestServerEndpoint.<init>(RestServerEndpoint.java:114) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.webmonitor.WebMonitorEndpoint.<init>(WebMonitorEndpoint.java:200) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint.<init>(DispatcherRestEndpoint.java:68) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.rest.SessionRestEndpointFactory.createRestEndpoint(SessionRestEndpointFactory.java:63) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:152) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168) ~[flink-dist_2.12-1.11.0.jar:1.11.0]
        ... 2 more  
Reply | Threaded
Open this post in threaded view
|

Re: Matching largest event pattern without duplicates

Dawid Wysakowicz-2
In reply to this post by James Buchan

Hi James,

I think it is not easy to achieve with the CEP library. Adding the consecutive quantifier to the oneOrMore strategy should eliminate a few of the unwanted cases from your example (`b:c`, `b`, `a`, `c`), but it would not eliminate the `c:a`. The problem is you need to skip to the first duplicate in the chain. There is no method that would let you do a "conditional jump".

I'd recommend implementing the logic with e.g. a custom FlatMap function and a ListState[1], where you could keep the sequence in the state and prune the leading elements up until the duplicate.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/state.html#using-keyed-state

On 29/07/2020 19:03, James Buchan wrote:
Hey all,

I'm trying to complete a small POC to see if Flink is suitable for our needs and the first step is to evaluate a stream of events and continually output the largest active group that does not contain duplicates.  I'm attempting to do this with the CEP pattern matching.

For example, for the following input:

>a
>a
>b
>c
>a
>c

I would expect an output of:

a
a
a:b
a:b:c
b:c:a
a:c

The closest I've been able to get is which returns:

a
a
a:b
a:b:c
b:c:a
b:c
b
c:a
a:c
a
c

When the initial pattern continues to grow it looks good, but as soon as duplicate is seen I receive more results than I would like.  This example uses the skipToFirst strategy; I thought others would be more helpful but ended up with less desirable results.

This feels like it should be easily solvable but I've not been able to find the right combination of options to get it working.  Any assistance would be appreciated.

Here's the details of my latest method:

public static void cep() throws Exception {
  log.info("Initializing cep processor");

  String inputTopic = "inputTopic";
  String outputTopic = "outputTopic";
  String consumerGroup = "testGroup";
  String address = "localhost:9092";

  StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

  log.info("Creating consumer");
  FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(
      inputTopic, address, consumerGroup);
  flinkKafkaConsumer.setStartFromLatest();

  log.info("Creating producer");
  FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address);

  log.info("Configuring sources");
  DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);

  log.info("Processing kafka messages");
  AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("start");
  Pattern<String, ?> pattern = Pattern.<String>begin("start", skipStrategy)
      .oneOrMore()
      .until(new IterativeCondition<>() {
        @Override
        public boolean filter(String s, Context<String> context) throws Exception {
          return StreamSupport.stream(context.getEventsForPattern("start").spliterator(), false)
              .anyMatch(state -> state.equals(s));
        }
      });

  PatternStream<String> patternStream = CEP.pattern(stringInputStream, pattern);
  DataStream<String> result = patternStream.select(
      (PatternSelectFunction<String, String>) map ->
          String.format("Evaluated these states %s", String.join(":", map.get("start")))
  );
  result.addSink(flinkKafkaProducer);

  environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
  environment.execute("Flink cep Example");
}


Thanks!

-James

signature.asc (849 bytes) Download Attachment