activemq connector not working..

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

activemq connector not working..

Puneet Kinra-2
Hi 

I used apache bahir connector  below is the code.the job is getting finished
and not generated the output as well ,ideal it should keep on running below the code.


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.activemq.AMQSource;
import org.apache.flink.streaming.connectors.activemq.AMQSourceConfig;
import org.apache.flink.streaming.connectors.activemq.DestinationType;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author puneet
 *
 */
public class TestAMQ {


public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616"))
.setDestinationName("test")
.setDeserializationSchema(new SimpleStringSchema())
.setDestinationType(DestinationType.QUEUE)
.build();
DataStream < String > messageStream = env.addSource(new AMQSource<String>(sourceConfig)); 
messageStream.print();
env.execute();
}

}


--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: activemq connector not working..

Timo Walther
Hi Puneet,

are you running this job on the cluster or locally in your IDE?

Regards,
Timo


Am 14.03.18 um 13:49 schrieb Puneet Kinra:
Hi 

I used apache bahir connector  below is the code.the job is getting finished
and not generated the output as well ,ideal it should keep on running below the code.


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.activemq.AMQSource;
import org.apache.flink.streaming.connectors.activemq.AMQSourceConfig;
import org.apache.flink.streaming.connectors.activemq.DestinationType;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author puneet
 *
 */
public class TestAMQ {


public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616"))
.setDestinationName("test")
.setDeserializationSchema(new SimpleStringSchema())
.setDestinationType(DestinationType.QUEUE)
.build();
DataStream < String > messageStream = env.addSource(new AMQSource<String>(sourceConfig)); 
messageStream.print();
env.execute();
}

}


--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: activemq connector not working..

Puneet Kinra-2
I tried in cluster as well .

On Wed, Mar 14, 2018 at 10:01 PM, Timo Walther <[hidden email]> wrote:
Hi Puneet,

are you running this job on the cluster or locally in your IDE?

Regards,
Timo


Am 14.03.18 um 13:49 schrieb Puneet Kinra:
Hi 

I used apache bahir connector  below is the code.the job is getting finished
and not generated the output as well ,ideal it should keep on running below the code.


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.activemq.AMQSource;
import org.apache.flink.streaming.connectors.activemq.AMQSourceConfig;
import org.apache.flink.streaming.connectors.activemq.DestinationType;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author puneet
 *
 */
public class TestAMQ {


public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616"))
.setDestinationName("test")
.setDeserializationSchema(new SimpleStringSchema())
.setDestinationType(DestinationType.QUEUE)
.build();
DataStream < String > messageStream = env.addSource(new AMQSource<String>(sourceConfig)); 
messageStream.print();
env.execute();
}

}


--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]






--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: activemq connector not working..

Puneet Kinra-2

I tried getting this in logs..


2018-03-15 20:59:38,154 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - No state backend has been configured, using default state backend (Memory / JobManager)

2018-03-15 20:59:38,296 INFO  org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase  - No state to restore for the AMQSource.

2018-03-15 20:59:39,488 WARN  org.apache.flink.streaming.connectors.activemq.AMQSource      - Active MQ source received non bytes message: null



On Thu, Mar 15, 2018 at 9:00 PM, Puneet Kinra <[hidden email]> wrote:
I tried in cluster as well .

On Wed, Mar 14, 2018 at 10:01 PM, Timo Walther <[hidden email]> wrote:
Hi Puneet,

are you running this job on the cluster or locally in your IDE?

Regards,
Timo


Am 14.03.18 um 13:49 schrieb Puneet Kinra:
Hi 

I used apache bahir connector  below is the code.the job is getting finished
and not generated the output as well ,ideal it should keep on running below the code.


import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.activemq.AMQSource;
import org.apache.flink.streaming.connectors.activemq.AMQSourceConfig;
import org.apache.flink.streaming.connectors.activemq.DestinationType;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

/**
 * @author puneet
 *
 */
public class TestAMQ {


public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfigBuilder<String>()
.setConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61616"))
.setDestinationName("test")
.setDeserializationSchema(new SimpleStringSchema())
.setDestinationType(DestinationType.QUEUE)
.build();
DataStream < String > messageStream = env.addSource(new AMQSource<String>(sourceConfig)); 
messageStream.print();
env.execute();
}

}


--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]






--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]





--
Cheers 

Puneet Kinra

Mobile:+918800167808 | Skype : [hidden email]

e-mail :[hidden email]