Hello,
I am currently working on my masters and I encountered a difficult problem.
Background (for context): I am trying to connect different data stream processors. Therefore i am using Flink's internal mechanisms of creating custom sinks and sources to receive from and send to different data stream processors. I am starting a separate process (message-buffer-process) in those custom sinks and sources to communicate and buffer data into that message-buffer-process. My implementation is created with Maven and it could potentially be added as an dependency.
Problem: I already tested my implementation by adding it as an dependency to a simple Flink word-count example. The test was within an IDE which works perfectly fine. But when i package that Flink work-count example and try to run it with "./flink run " or by uploading and submitting it as a job, it tells me that my buffer-process-class could not be found: In German: "Fehler: Hauptklasse de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nicht gefunden oder geladen werden" Roughly translated: "Error: Main class de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not be found or loaded"
Code snipplets: Example - Adding my custom sink to send data to another data stream processor: dataStream.addSink( (SinkFunction)DSPConnectorFactory .getInstance() .createSinkConnector( new DSPConnectorConfig .Builder("localhost", 9656) .withDSP("flink") .withBufferConnectorString("buffer-connection-string") .withHWM(20) .withTimeout(10000) .build()));
|
Hi
Since you use the message-buffer-process as a dependency and the error tells you class not found, have you ever check your application jar package whether containing the wanted MessageBufferProcess.class? If not existed, try to use
assembly-maven or
shaded-maven plugin to include your classes.
Best
Yun Tang
From: Ly, The Anh <[hidden email]>
Sent: Friday, November 2, 2018 6:33 To: [hidden email] Subject: Starting a seperate Java process within a Flink cluster Hello,
I am currently working on my masters and I encountered a difficult problem.
Background (for context): I am trying to connect different data stream processors. Therefore i am using Flink's internal mechanisms of creating custom sinks and sources to receive from and send to different data stream processors. I am starting a separate process (message-buffer-process) in those custom sinks and sources to communicate and buffer data into that message-buffer-process. My implementation is created with Maven and it could potentially be added as an dependency.
Problem: I already tested my implementation by adding it as an dependency to a simple Flink word-count example. The test was within an IDE which works perfectly fine. But when i package that Flink work-count example and try to run it with "./flink run " or by uploading and submitting it as a job, it tells me that my buffer-process-class could not be found: In German: "Fehler: Hauptklasse de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nicht gefunden oder geladen werden" Roughly translated: "Error: Main class de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not be found or loaded"
Code snipplets: Example - Adding my custom sink to send data to another data stream processor: dataStream.addSink( (SinkFunction)DSPConnectorFactory .getInstance() .createSinkConnector( new DSPConnectorConfig .Builder("localhost", 9656) .withDSP("flink") .withBufferConnectorString("buffer-connection-string") .withHWM(20) .withTimeout(10000) .build()));
|
Yes, i did. It is definitely there. I tried and made a separate Maven project to test if something was wrong with my jar.
The resulting shaded jar of that test project was fine and the message-buffer-process was running with that test jar.
Am 02.11.2018 04:47 schrieb Yun Tang <[hidden email]>:
Hi
Since you use the message-buffer-process as a dependency and the error tells you class not found, have you ever check your application jar package whether containing the wanted MessageBufferProcess.class? If not existed, try to use
assembly-maven or
shaded-maven plugin to include your classes.
Best
Yun Tang
From: Ly, The Anh <[hidden email]>
Sent: Friday, November 2, 2018 6:33 To: [hidden email] Subject: Starting a seperate Java process within a Flink cluster Hello,
I am currently working on my masters and I encountered a difficult problem.
Background (for context): I am trying to connect different data stream processors. Therefore i am using Flink's internal mechanisms of creating custom sinks and sources to receive from and send to different data stream processors. I am starting a separate process (message-buffer-process) in those custom sinks and sources to communicate and buffer data into that message-buffer-process. My implementation is created with Maven and it could potentially be added as an dependency.
Problem: I already tested my implementation by adding it as an dependency to a simple Flink word-count example. The test was within an IDE which works perfectly fine. But when i package that Flink work-count example and try to run it with "./flink run " or by uploading and submitting it as a job, it tells me that my buffer-process-class could not be found: In German: "Fehler: Hauptklasse de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess konnte nicht gefunden oder geladen werden" Roughly translated: "Error: Main class de.tuberlin.mcc.geddsprocon.messagebuffer.MessageBufferProcess could not be found or loaded"
Code snipplets: Example - Adding my custom sink to send data to another data stream processor: dataStream.addSink( (SinkFunction)DSPConnectorFactory .getInstance() .createSinkConnector( new DSPConnectorConfig .Builder("localhost", 9656) .withDSP("flink") .withBufferConnectorString("buffer-connection-string") .withHWM(20) .withTimeout(10000) .build()));
|
The error is most likely due to classpath issue. Because classpath is different when you running flink program in IDE and run it in cluster. And starting another jvm process in SourceFunction doesn't seems a good approach to me, is it possible for you to do in your custom SourceFunction ? Ly, The Anh <[hidden email]>于2018年11月2日周五 下午5:22写道:
|
Hi, I am afraid that would be extremely hard what you are trying to
do as in a cluster setup not all dependencies are taken from the
taskmanager classpath, actually the user code classes are loaded
dynamically, therefore they cannot be accessed in your new process
which does not have access to those user classes. Best, Dawid On 02/11/2018 10:34, Jeff Zhang wrote:
signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |