Statefun with RabbitMQ consumes message but does not run statefun

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Statefun with RabbitMQ consumes message but does not run statefun

Stephan Pelikan

Hi,

 

I try to use RabbitMQ as a Source. My source consumes messages of the queue but the statefun is not execution – not even created.

 

This is my main function:

 

1 public static void main(String[] args) throws Exception {

2

3     final var env = StreamExecutionEnvironment.getExecutionEnvironment();

4    

5     env.registerTypeWithKryoSerializer(Any.class, ProtobufSerializer.class);

6

7     env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);

8     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

9     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

10     env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

11

12     final var statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);

13     statefunConfig.setFlinkJobName("test");

14     statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);

15

16     final var connectionConfig = new RMQConnectionConfig.Builder()

17             .setHost("localhost")

18             .setUserName("guest")

19             .setPassword("guest")

20             .setPort(5672)

21             .setVirtualHost("test")

22             .setPrefetchCount(5000)

23             .build();

24

25     final var deserializationSchema = new TypeInformationSerializationSchema<>(

26             new ProtobufTypeInformation<>(Any.class), env.getConfig());

27     final var rmqSource = new RMQSource<>(connectionConfig, TEST_INGRESS, true, deserializationSchema);

28

29     final var source = env

30             .addSource(rmqSource, TEST_INGRESS)

31             .setParallelism(1)

32             .map(msg -> {

33                 return RoutableMessageBuilder

34                     .builder()

35                     .withTargetAddress(MyStatefun.TYPE, Utils.getUUID())

36                     .withMessageBody(msg)

37                     .build();

38             });

39    

40     StatefulFunctionDataStreamBuilder

41             .builder("test")

42             .withDataStreamAsIngress(source)

43             .withFunctionProvider(MyStatefun.TYPE, unused -> {

44                 return new MyStatefun();

45             })

46             .withEgressId(MyStatefun.EGRESS)

47             .withConfiguration(statefunConfig)

48             .build(env)

49             .getDataStreamForEgressId(MyStatefun.EGRESS)

50             .addSink(new PrintSinkFunction<>(true));

51    

52     env.execute();

53         

54 }

 

A breakpoint in line 33 shows me the messages consumed. A breakpoint in line 44 is never called. The message is reportingly consumed but never acknowledged or processed. Before using RabbitMQ I used a custom SourceFunction to fake input data and it worked well.

 

To setup things I use a local environment but logging does not show up any errors. Before my current problem I had another error during message deserialization and it wasn’t reported either. Unfortunately I didn’t manage to get the exception in the log/stdout. I had to use the debugger to find the reason of the former problem. In this situation now the debugger shows no thrown or caught exceptions. That’s way I stuck.

 

Of course I would like to know what’s the problem with my code. But I guess it is not obviously. Maybe some can give me a hint how to turn on exception logging which might help to get closer to the origin of the phenomenon.

 

Thanks in advance,

Stephan

 

Reply | Threaded
Open this post in threaded view
|

AW: Statefun with RabbitMQ consumes message but does not run statefun

Stephan Pelikan

I found the reason: There is a class incompatibility because I changed from

    Statefun 2.2.1 + Flink 1.11.1

to

    Statefun 2.2.1 + Flink 1.12.0

 

But even the newest version of Statefun 2.2.2 refers to Flink 1.11.3.

 

Is there a possibility to use the newest version of Flink in combination with the newest version of Statefun? I’m wondering why there is no Statefun version matching the current stable version of Flink?

 

Stephan

 

 

Von: Stephan Pelikan <[hidden email]>
Gesendet: Montag, 11. Jänner 2021 19:37
An: [hidden email]
Betreff: Statefun with RabbitMQ consumes message but does not run statefun

 

Hi,

 

I try to use RabbitMQ as a Source. My source consumes messages of the queue but the statefun is not execution – not even created.

 

This is my main function:

 

1 public static void main(String[] args) throws Exception {

2

3     final var env = StreamExecutionEnvironment.getExecutionEnvironment();

4    

5     env.registerTypeWithKryoSerializer(Any.class, ProtobufSerializer.class);

6

7     env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);

8     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

9     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

10     env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

11

12     final var statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);

13     statefunConfig.setFlinkJobName("test");

14     statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);

15

16     final var connectionConfig = new RMQConnectionConfig.Builder()

17             .setHost("localhost")

18             .setUserName("guest")

19             .setPassword("guest")

20             .setPort(5672)

21             .setVirtualHost("test")

22             .setPrefetchCount(5000)

23             .build();

24

25     final var deserializationSchema = new TypeInformationSerializationSchema<>(

26             new ProtobufTypeInformation<>(Any.class), env.getConfig());

27     final var rmqSource = new RMQSource<>(connectionConfig, TEST_INGRESS, true, deserializationSchema);

28

29     final var source = env

30             .addSource(rmqSource, TEST_INGRESS)

31             .setParallelism(1)

32             .map(msg -> {

33                 return RoutableMessageBuilder

34                     .builder()

35                     .withTargetAddress(MyStatefun.TYPE, Utils.getUUID())

36                     .withMessageBody(msg)

37                     .build();

38             });

39    

40     StatefulFunctionDataStreamBuilder

41             .builder("test")

42             .withDataStreamAsIngress(source)

43             .withFunctionProvider(MyStatefun.TYPE, unused -> {

44                 return new MyStatefun();

45             })

46             .withEgressId(MyStatefun.EGRESS)

47             .withConfiguration(statefunConfig)

48             .build(env)

49             .getDataStreamForEgressId(MyStatefun.EGRESS)

50             .addSink(new PrintSinkFunction<>(true));

51    

52     env.execute();

53         

54 }

 

A breakpoint in line 33 shows me the messages consumed. A breakpoint in line 44 is never called. The message is reportingly consumed but never acknowledged or processed. Before using RabbitMQ I used a custom SourceFunction to fake input data and it worked well.

 

To setup things I use a local environment but logging does not show up any errors. Before my current problem I had another error during message deserialization and it wasn’t reported either. Unfortunately I didn’t manage to get the exception in the log/stdout. I had to use the debugger to find the reason of the former problem. In this situation now the debugger shows no thrown or caught exceptions. That’s way I stuck.

 

Of course I would like to know what’s the problem with my code. But I guess it is not obviously. Maybe some can give me a hint how to turn on exception logging which might help to get closer to the origin of the phenomenon.

 

Thanks in advance,

Stephan

 

Reply | Threaded
Open this post in threaded view
|

Re: Statefun with RabbitMQ consumes message but does not run statefun

Tzu-Li (Gordon) Tai
Hi,

There is no lock-step of releasing a new StateFun release when a new Flink release goes out. StateFun and Flink have individual releasing schemes and schedules.

Usually, for new major StateFun version releases, we will upgrade its Flink dependency to the latest available version.
We are currently targeting mid February for the next major StateFun release, which by then the Flink dependency will be upgraded to 1.12.x.
In the meantime, if you'd like to work against Flink 1.12.x with StateFun, you might have to resort to building the artifacts yourself.

Cheers,
Gordon

On Tue, Jan 12, 2021 at 3:57 PM Stephan Pelikan <[hidden email]> wrote:

I found the reason: There is a class incompatibility because I changed from

    Statefun 2.2.1 + Flink 1.11.1

to

    Statefun 2.2.1 + Flink 1.12.0

 

But even the newest version of Statefun 2.2.2 refers to Flink 1.11.3.

 

Is there a possibility to use the newest version of Flink in combination with the newest version of Statefun? I’m wondering why there is no Statefun version matching the current stable version of Flink?

 

Stephan

 

 

Von: Stephan Pelikan <[hidden email]>
Gesendet: Montag, 11. Jänner 2021 19:37
An: [hidden email]
Betreff: Statefun with RabbitMQ consumes message but does not run statefun

 

Hi,

 

I try to use RabbitMQ as a Source. My source consumes messages of the queue but the statefun is not execution – not even created.

 

This is my main function:

 

1 public static void main(String[] args) throws Exception {

2

3     final var env = StreamExecutionEnvironment.getExecutionEnvironment();

4    

5     env.registerTypeWithKryoSerializer(Any.class, ProtobufSerializer.class);

6

7     env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);

8     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

9     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

10     env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

11

12     final var statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);

13     statefunConfig.setFlinkJobName("test");

14     statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);

15

16     final var connectionConfig = new RMQConnectionConfig.Builder()

17             .setHost("localhost")

18             .setUserName("guest")

19             .setPassword("guest")

20             .setPort(5672)

21             .setVirtualHost("test")

22             .setPrefetchCount(5000)

23             .build();

24

25     final var deserializationSchema = new TypeInformationSerializationSchema<>(

26             new ProtobufTypeInformation<>(Any.class), env.getConfig());

27     final var rmqSource = new RMQSource<>(connectionConfig, TEST_INGRESS, true, deserializationSchema);

28

29     final var source = env

30             .addSource(rmqSource, TEST_INGRESS)

31             .setParallelism(1)

32             .map(msg -> {

33                 return RoutableMessageBuilder

34                     .builder()

35                     .withTargetAddress(MyStatefun.TYPE, Utils.getUUID())

36                     .withMessageBody(msg)

37                     .build();

38             });

39    

40     StatefulFunctionDataStreamBuilder

41             .builder("test")

42             .withDataStreamAsIngress(source)

43             .withFunctionProvider(MyStatefun.TYPE, unused -> {

44                 return new MyStatefun();

45             })

46             .withEgressId(MyStatefun.EGRESS)

47             .withConfiguration(statefunConfig)

48             .build(env)

49             .getDataStreamForEgressId(MyStatefun.EGRESS)

50             .addSink(new PrintSinkFunction<>(true));

51    

52     env.execute();

53         

54 }

 

A breakpoint in line 33 shows me the messages consumed. A breakpoint in line 44 is never called. The message is reportingly consumed but never acknowledged or processed. Before using RabbitMQ I used a custom SourceFunction to fake input data and it worked well.

 

To setup things I use a local environment but logging does not show up any errors. Before my current problem I had another error during message deserialization and it wasn’t reported either. Unfortunately I didn’t manage to get the exception in the log/stdout. I had to use the debugger to find the reason of the former problem. In this situation now the debugger shows no thrown or caught exceptions. That’s way I stuck.

 

Of course I would like to know what’s the problem with my code. But I guess it is not obviously. Maybe some can give me a hint how to turn on exception logging which might help to get closer to the origin of the phenomenon.

 

Thanks in advance,

Stephan

 

Reply | Threaded
Open this post in threaded view
|

AW: Statefun with RabbitMQ consumes message but does not run statefun

Stephan Pelikan

> currently targeting mid February for the next major StateFun release

 

Thank you Gordon for sharing this information. As I could now see in the Flink Blog you bring out new versions frequently. I’m a newbie to Flink and I supposed a match of the docs of Statefun and Flink. But of course it is not a problem, I was just astonished.

 

Thanks,

Stephan

 

 

Von: Tzu-Li (Gordon) Tai <[hidden email]>
Gesendet: Dienstag, 12. Jänner 2021 10:07
An: Stephan Pelikan <[hidden email]>
Cc: [hidden email]
Betreff: Re: Statefun with RabbitMQ consumes message but does not run statefun

 

Hi,

There is no lock-step of releasing a new StateFun release when a new Flink release goes out. StateFun and Flink have individual releasing schemes and schedules.


Usually, for new major StateFun version releases, we will upgrade its Flink dependency to the latest available version.
We are currently targeting mid February for the next major StateFun release, which by then the Flink dependency will be upgraded to 1.12.x.
In the meantime, if you'd like to work against Flink 1.12.x with StateFun, you might have to resort to building the artifacts yourself.

Cheers,
Gordon

 

On Tue, Jan 12, 2021 at 3:57 PM Stephan Pelikan <[hidden email]> wrote:

I found the reason: There is a class incompatibility because I changed from

    Statefun 2.2.1 + Flink 1.11.1

to

    Statefun 2.2.1 + Flink 1.12.0

 

But even the newest version of Statefun 2.2.2 refers to Flink 1.11.3.

 

Is there a possibility to use the newest version of Flink in combination with the newest version of Statefun? I’m wondering why there is no Statefun version matching the current stable version of Flink?

 

Stephan

 

 

Von: Stephan Pelikan <[hidden email]>
Gesendet: Montag, 11. Jänner 2021 19:37
An: [hidden email]
Betreff: Statefun with RabbitMQ consumes message but does not run statefun

 

Hi,

 

I try to use RabbitMQ as a Source. My source consumes messages of the queue but the statefun is not execution – not even created.

 

This is my main function:

 

1 public static void main(String[] args) throws Exception {

2

3     final var env = StreamExecutionEnvironment.getExecutionEnvironment();

4    

5     env.registerTypeWithKryoSerializer(Any.class, ProtobufSerializer.class);

6

7     env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE);

8     env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

9     env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

10     env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

11

12     final var statefunConfig = StatefulFunctionsConfig.fromEnvironment(env);

13     statefunConfig.setFlinkJobName("test");

14     statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);

15

16     final var connectionConfig = new RMQConnectionConfig.Builder()

17             .setHost("localhost")

18             .setUserName("guest")

19             .setPassword("guest")

20             .setPort(5672)

21             .setVirtualHost("test")

22             .setPrefetchCount(5000)

23             .build();

24

25     final var deserializationSchema = new TypeInformationSerializationSchema<>(

26             new ProtobufTypeInformation<>(Any.class), env.getConfig());

27     final var rmqSource = new RMQSource<>(connectionConfig, TEST_INGRESS, true, deserializationSchema);

28

29     final var source = env

30             .addSource(rmqSource, TEST_INGRESS)

31             .setParallelism(1)

32             .map(msg -> {

33                 return RoutableMessageBuilder

34                     .builder()

35                     .withTargetAddress(MyStatefun.TYPE, Utils.getUUID())

36                     .withMessageBody(msg)

37                     .build();

38             });

39    

40     StatefulFunctionDataStreamBuilder

41             .builder("test")

42             .withDataStreamAsIngress(source)

43             .withFunctionProvider(MyStatefun.TYPE, unused -> {

44                 return new MyStatefun();

45             })

46             .withEgressId(MyStatefun.EGRESS)

47             .withConfiguration(statefunConfig)

48             .build(env)

49             .getDataStreamForEgressId(MyStatefun.EGRESS)

50             .addSink(new PrintSinkFunction<>(true));

51    

52     env.execute();

53         

54 }

 

A breakpoint in line 33 shows me the messages consumed. A breakpoint in line 44 is never called. The message is reportingly consumed but never acknowledged or processed. Before using RabbitMQ I used a custom SourceFunction to fake input data and it worked well.

 

To setup things I use a local environment but logging does not show up any errors. Before my current problem I had another error during message deserialization and it wasn’t reported either. Unfortunately I didn’t manage to get the exception in the log/stdout. I had to use the debugger to find the reason of the former problem. In this situation now the debugger shows no thrown or caught exceptions. That’s way I stuck.

 

Of course I would like to know what’s the problem with my code. But I guess it is not obviously. Maybe some can give me a hint how to turn on exception logging which might help to get closer to the origin of the phenomenon.

 

Thanks in advance,

Stephan