[Flink-Siddhi]: Issue processing siddhi CEP task on flink

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[Flink-Siddhi]: Issue processing siddhi CEP task on flink

Dipanjan Mazumder

Hi Guys,

     I am facing another integration challenge with flink-siddhi which i am not able to crack yet , i am trying to debug the flow with flink but its too much and taking a lot of time , being new to flink. My problem is basically when i publish message through kafka and the flink aplication/job has CEP processing operation using SiddiCEP , taskmanager is not processing the task with the operator having siddhi CEP. It is not even throwing any error anywhere. So i started debugging the taskmanager but i am not able to reach anywhere yet have been doing that from last 4 days with no luck. I have created a stackoverflow query which has more details on the program and application. 

Flink version : 1.13
Scala: 2.11
Java: 1.8
I am using flink-siddhi library: haoch/flink-siddhi 2.11-0.2.2-snapshot version




The link is :





If there is any pointer that can be given on the problem , it would save my day and life. So please kindly respond on the same.

Thanks in advance.

Regards
Dipanjan
On Tuesday, May 25, 2021, 12:48:59 PM GMT+5:30, Dipanjan Mazumder <[hidden email]> wrote:


Hi All,

    Found the solution , 

Problem: I was actually using an intermediate library to integrate siddhi with Flink (https://github.com/haoch/flink-siddhi) and i was creating a SiddhiCEP instance and then calling "define()" on that instance , while i was registering the extension on the created instance , but the define method was creating an internal SiddhiCEP instance and using that for processing. So i found that out by debugging the application. It is an implementation problem with the library itself.

Solution: i used from() method on the created SiddhiCEP instance instead of define() and it was using the CEP i created for doing the rest of processing and the registered extension were accounted similarly and so they were recognised at runtime.

Regards
Dipanjan

On Friday, May 21, 2021, 01:51:09 PM GMT+5:30, Salva Alcántara <[hidden email]> wrote:


Hi Dipanjan,

I agree with Till. If the extensions are are included in the jar for your
job, it should work. I was having the same doubts some weeks a go and can
confirm that as long as the jar includes those extensions, it works.

One thing I needed to do is to register the different extensions. For
example:

```
siddhiManager.setExtension("map:create", classOf[CreateFunctionExtension])

```

Regards,


Salva



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/