Hi,
I am trying to run https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example locally And have several questions. 1. It seems fairly straightforward to use it with in memory message generators, but I can’t figure out how to add Kafka ingress/Egress so that I can use it with Kafk 2. GreetingModule already creates StatefulFunctionUniverse and so does Harness. Is there a way to short circuit it and have Harness get StatefulFunctionUniverse directly 3. Is there an example on how to write Flink main for stageful function? 4. Is there an example anywhere on how to run such examples in the IDE with Kafka? 5 There is a great stateful functions example https://github.com/ververica/flink-statefun-workshop, but its readme does not really describe implementation and neither does this article, referencing it https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39. Is there anything that describes this implementation? |
Also, where do I put flint-conf.yaml in Idea to add additional required config parameter:
|
Hi, On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <[hidden email]> wrote:
Could you provide some context on why you would want to do that? The StateFun Flink Harness was not intended to work with the usual shipped ingress / egresses, but purely as a utility for users to run StateFun applications in a consolidated local setup. For testing against Kafka, I would suggest looking at how the StateFun end-to-end tests do it, using testcontainers. The tests are located under `statefun-e2e-tests` module. If you still want to use the Flink Harness for this, you may be able to use the withFlinkSourceFunction function to directly supply the Flink Kafka connector. This only works for the ingress side, though.
That is not possible. The StatefulFunctionUniverse that the Harness utility provides is always a "mock" one, which contains the defined in-memory ingress and egresses. As previously mentioned, that is because the Flink Harness was intended for running StateFun applications without the need to interact with any other external systems.
At the moment, it is not possible to directly integrate Flink APIs and Stateful Functions APIs in a single job. What do you have in mind for what you want to achieve?
The tests in `statefun-e2e-tests` can be run in the IDE and tests against Kafka. It does require Docker to be available though.
I think the bottom half of the article provides some details of the example, including the messaging between functions and a rough sketch of the functions. Maybe its not detailed enough? In particular, what parts of the example would you want to have more details on? Cheers, Gordon |
Sorry, forgot to cc user@ as well in the last reply. On Fri, May 22, 2020 at 12:01 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
|
In reply to this post by Boris Lublinsky
Are you getting an exception from running the Harness? The Harness should already have the required configurations, such as the parent first classloading config. Otherwise, if you would like to add your own configuration, use the `withConfiguration` method on the `Harness` class. On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <[hidden email]> wrote:
|
Hi, Sorry, I need to correct my comment on using the Kafka ingress / egress with the Harness. That is actually doable, by adding an extra dependency to `statefun-flink-distribution` in your Harness program. That pulls in all the other required dependencies required by the Kafka ingress / egress, such as the source / sink providers and Flink Kafka connectors. Cheers, Gordon On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
|
Thanks,
I will try your suggestions. One more question. Is StatefulFunctionJob the one that is used for Stateful function execution in the docker case?
|
In reply to this post by Tzu-Li (Gordon) Tai
Also there seems to be a problem in this class public final class Modules When I am running Harness example it fails with the error Exception in thread "main" java.lang.IllegalStateException: There are no routers defined. at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:31) at org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:66) at org.apache.flink.statefun.flink.harness.Harness.start(Harness.java:128) at com.lightbend.flink.statefun.examples.greeter.HarnessRunner.main(HarnessRunner.java:18) Which seems to be due to the fact that the method: public static Modules loadFromClassPath() { Returns no stageful functions - just IO modules
|
In reply to this post by Tzu-Li (Gordon) Tai
The project https://github.com/apache/flink-statefun/tree/release-2.0/statefun-examples/statefun-flink-harness-example
Does not work in Intellij. The problem is that when running in Intellij, method public static Modules loadFromClassPath() { Does not pick up classes, which are local in Intellij Any work arounds?
|
Hi, The example is working fine on my side (also using IntelliJ). This could most likely be a problem with your project setup in the IDE, where the classpath isn't setup correctly. What do you see when you right click on the statefun-flink-harness-example directory (in the IDE) --> Open Module Settings, and then under the "Sources" / "Dependencies" tab? Usually this should all be automatically setup correctly when importing the project. Gordon On Wed, May 27, 2020 at 11:46 PM Boris Lublinsky <[hidden email]> wrote:
|
I think I figured this out.
The project seems to be missing Another question: I see org.apache.flink.statefun.flink.io.datastream.SourceSinkModule Class, which I think allows to use existing data streams as ingress/egress. Are there any examples of its usage
|
Yes, the functions / ingresses / regresses etc. are not discoverable if the service file isnt present in the classpath. For the examples, if you are running it straight from the repo, should all have that service file defined and therefore readily runnable. If you are creating your own application project, you'll have to add that yourself.
On the Harness class, there is a withFlinkSourceFunction method in which you can directly add a Flink source function as the ingress. If you want to use that directly in a normal application (not just execution in IDE with the Harness), you can define your ingesses/egresses by binding SourceFunctionSpec / SinkFunctionSpec. Please see how they are being used in the Harness class for examples. Gordon
|
Ok, so the bug in the examples is an absence of resources. Having classes in the classpath is not sufficient
Modules.java is using ServiceLoader, which is setting private static final String PREFIX = "META-INF/services/"
So all the modules have to be listed in the resource files
|
In reply to this post by Tzu-Li (Gordon) Tai
Thats not exactly the usage question that I am asking
When I am writing IO module I have to write Ingress and Egress spec. You have an example for Kafka, which looks like def getIngressSpec: IngressSpec[GreetRequest] = How is it going to look if I am using SourceSinkModule? Do I just specify stream names? Something else?
|
Hi Boris, Example usage of flink sources and sink is available in the documentation[1]. On Wed, May 27, 2020 at 1:08 PM Boris Lublinsky <[hidden email]> wrote:
|
Thanks Seth
Will take a look.
|
In reply to this post by Seth Wiesman
Also I have noticed, that a few cludstate jars including statefun-flink-core, statefun-flink-io, statefun-flink-harness are build for Scala 11, is it possible to create versions of those for Scala 12?
|
Free forum by Nabble | Edit this page |