Working out through individual messages in Flink

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Working out through individual messages in Flink

Mich Talebzadeh

Hi,

I have a Kafka topic that transmits 100 security prices ever 2 seconds. 

In Spark streaming I go through the RDD and walk through rows one by one and check prices
In prices are valuable I store them into an Hbase table

    val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    dstream.cache()
    dstream.foreachRDD
    { pricesRDD =>
          // Work on individual messages

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
            if (price > 90.0)

           {
               //save to Hbase table
           }
          }
     }

That works fine. 

In Flink I define my source as below

    val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = streamExecEnv
       .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))

Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of 

for(line <- pricesRDD.collect.toArray)
In flink?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Working out through individual messages in Flink

Renjie Liu
Hi, Mich:
You can add write a sink function for that.

On Mon, Jul 30, 2018 at 2:58 PM Mich Talebzadeh <[hidden email]> wrote:

Hi,

I have a Kafka topic that transmits 100 security prices ever 2 seconds. 

In Spark streaming I go through the RDD and walk through rows one by one and check prices
In prices are valuable I store them into an Hbase table

    val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    dstream.cache()
    dstream.foreachRDD
    { pricesRDD =>
          // Work on individual messages

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
            if (price > 90.0)

           {
               //save to Hbase table
           }
          }
     }

That works fine. 

In Flink I define my source as below

    val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = streamExecEnv
       .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))

Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of 

for(line <- pricesRDD.collect.toArray)
In flink?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

--
Liu, Renjie
Software Engineer, MVAD
Reply | Threaded
Open this post in threaded view
|

Re: Working out through individual messages in Flink

Mich Talebzadeh
In reply to this post by Mich Talebzadeh
Just to clarify these are the individual prices separated by ','. The below shows three price lines in the topic

UUID,                            Security,         Time,        Price
1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <[hidden email]> wrote:

Hi,

I have a Kafka topic that transmits 100 security prices ever 2 seconds. 

In Spark streaming I go through the RDD and walk through rows one by one and check prices
In prices are valuable I store them into an Hbase table

    val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    dstream.cache()
    dstream.foreachRDD
    { pricesRDD =>
          // Work on individual messages

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
            if (price > 90.0)

           {
               //save to Hbase table
           }
          }
     }

That works fine. 

In Flink I define my source as below

    val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = streamExecEnv
       .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))

Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of 

for(line <- pricesRDD.collect.toArray)
In flink?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Working out through individual messages in Flink

Fabian Hueske-2
Hi,

Flink processes streams record by record, instead of micro-batching records together. Since every record comes by itself, there is no for-each.
Simple record-by-record transformations can be done with a MapFunction, filtering out records with a FilterFunction. You can also implement a FlatMapFunction to do both in one step.

Once the stream is transformed and filtered, you can write it to HBase with a sink function.


2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Just to clarify these are the individual prices separated by ','. The below shows three price lines in the topic

UUID,                            Security,         Time,        Price
1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <[hidden email]> wrote:

Hi,

I have a Kafka topic that transmits 100 security prices ever 2 seconds. 

In Spark streaming I go through the RDD and walk through rows one by one and check prices
In prices are valuable I store them into an Hbase table

    val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    dstream.cache()
    dstream.foreachRDD
    { pricesRDD =>
          // Work on individual messages

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
            if (price > 90.0)

           {
               //save to Hbase table
           }
          }
     }

That works fine. 

In Flink I define my source as below

    val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = streamExecEnv
       .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))

Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of 

for(line <- pricesRDD.collect.toArray)
In flink?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 


Reply | Threaded
Open this post in threaded view
|

Re: Working out through individual messages in Flink

Mich Talebzadeh
Thanks Fabian. That was very useful.

How about an operation like below?

         // create builder
         val KafkaTableSource = Kafka011JsonTableSource.builder()
         // set Kafka topic
            .forTopic(topicsValue)
         // set Kafka consumer properties
            .withKafkaProperties(properties)
         // set Table schema
        .withSchema(TableSchema.builder()
        .field("key", Types.STRING)
        .field("ticker", Types.STRING)
        .field("timeissued", Types.STRING)
        .field("price", Types.FLOAT)
        .build())

Will that be OK? 


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink processes streams record by record, instead of micro-batching records together. Since every record comes by itself, there is no for-each.
Simple record-by-record transformations can be done with a MapFunction, filtering out records with a FilterFunction. You can also implement a FlatMapFunction to do both in one step.

Once the stream is transformed and filtered, you can write it to HBase with a sink function.


2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Just to clarify these are the individual prices separated by ','. The below shows three price lines in the topic

UUID,                            Security,         Time,        Price
1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <[hidden email]> wrote:

Hi,

I have a Kafka topic that transmits 100 security prices ever 2 seconds. 

In Spark streaming I go through the RDD and walk through rows one by one and check prices
In prices are valuable I store them into an Hbase table

    val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    dstream.cache()
    dstream.foreachRDD
    { pricesRDD =>
          // Work on individual messages

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
            if (price > 90.0)

           {
               //save to Hbase table
           }
          }
     }

That works fine. 

In Flink I define my source as below

    val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = streamExecEnv
       .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))

Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of 

for(line <- pricesRDD.collect.toArray)
In flink?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 


Reply | Threaded
Open this post in threaded view
|

Re: Working out through individual messages in Flink

Fabian Hueske-2
A *Table*Source [1], is a special input connector for Flink's relational APIs (Table API and SQL) [2].
You can transform and filter with these APIs as well (it's probably even easier). In SQL this would be the SELECT and WHERE clauses of a query.

However, there is no *Table*Sink for HBase and you would need to convert the Table back to a DataStream [3].
That's not very difficult since the APIs are integrated with each other.

Best, Fabian


2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Thanks Fabian. That was very useful.

How about an operation like below?

         // create builder
         val KafkaTableSource = Kafka011JsonTableSource.builder()
         // set Kafka topic
            .forTopic(topicsValue)
         // set Kafka consumer properties
            .withKafkaProperties(properties)
         // set Table schema
        .withSchema(TableSchema.builder()
        .field("key", Types.STRING)
        .field("ticker", Types.STRING)
        .field("timeissued", Types.STRING)
        .field("price", Types.FLOAT)
        .build())

Will that be OK? 


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink processes streams record by record, instead of micro-batching records together. Since every record comes by itself, there is no for-each.
Simple record-by-record transformations can be done with a MapFunction, filtering out records with a FilterFunction. You can also implement a FlatMapFunction to do both in one step.

Once the stream is transformed and filtered, you can write it to HBase with a sink function.


2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Just to clarify these are the individual prices separated by ','. The below shows three price lines in the topic

UUID,                            Security,         Time,        Price
1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <[hidden email]> wrote:

Hi,

I have a Kafka topic that transmits 100 security prices ever 2 seconds. 

In Spark streaming I go through the RDD and walk through rows one by one and check prices
In prices are valuable I store them into an Hbase table

    val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    dstream.cache()
    dstream.foreachRDD
    { pricesRDD =>
          // Work on individual messages

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
            if (price > 90.0)

           {
               //save to Hbase table
           }
          }
     }

That works fine. 

In Flink I define my source as below

    val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = streamExecEnv
       .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))

Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of 

for(line <- pricesRDD.collect.toArray)
In flink?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



Reply | Threaded
Open this post in threaded view
|

Re: Working out through individual messages in Flink

Mich Talebzadeh
Thanks again.

The Hbase connector works fine in Flink

    // Start Hbase table stuff
    val tableName = "MARKETDATAHBASESPEEDFLINK"
    val hbaseConf = HBaseConfiguration.create()
//  Connecting to remote Hbase
    hbaseConf.set("hbase.master", hbaseHost)
    hbaseConf.set("hbase.zookeeper.quorum",zookeeperHost)
    hbaseConf.set("hbase.zookeeper.property.clientPort",zooKeeperClientPort)
    hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
    // create this table with column family
    val admin = new HBaseAdmin(hbaseConf)
    if(!admin.isTableAvailable(tableName))
    {
      println("Creating table " + tableName)
      val tableDesc = new HTableDescriptor(tableName)
      tableDesc.addFamily(new HColumnDescriptor("PRICE_INFO".getBytes()))
      tableDesc.addFamily(new HColumnDescriptor("OPERATION".getBytes()))
      admin.createTable(tableDesc)
    } else {
      println("Table " + tableName + " already exists!!")
    }
    val HbaseTable = new HTable(hbaseConf, tableName)
    // End Hbase table stuff

So I just need to split every row into columns and put it into Hbase as follows:

            // Save prices to Hbase table
             var p = new Put(new String(key).getBytes())
             //p.add("PRICE_INFO".getBytes(), "key".getBytes(),          new String(ticker).getBytes())
             p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),          new String(ticker).getBytes())
             p.add("PRICE_INFO".getBytes(), "SSUED".getBytes(),     new String(timeissued).getBytes())
             p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),           new String(priceToString).getBytes())
             p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),         new String(CURRENCY).getBytes())
             p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),         new String(1.toString).getBytes())
             p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),         new String(System.currentTimeMillis.toString).getBytes())
             HbaseTable.put(p)
             HbaseTable.flushCommits()


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <[hidden email]> wrote:
A *Table*Source [1], is a special input connector for Flink's relational APIs (Table API and SQL) [2].
You can transform and filter with these APIs as well (it's probably even easier). In SQL this would be the SELECT and WHERE clauses of a query.

However, there is no *Table*Sink for HBase and you would need to convert the Table back to a DataStream [3].
That's not very difficult since the APIs are integrated with each other.

Best, Fabian


2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Thanks Fabian. That was very useful.

How about an operation like below?

         // create builder
         val KafkaTableSource = Kafka011JsonTableSource.builder()
         // set Kafka topic
            .forTopic(topicsValue)
         // set Kafka consumer properties
            .withKafkaProperties(properties)
         // set Table schema
        .withSchema(TableSchema.builder()
        .field("key", Types.STRING)
        .field("ticker", Types.STRING)
        .field("timeissued", Types.STRING)
        .field("price", Types.FLOAT)
        .build())

Will that be OK? 


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink processes streams record by record, instead of micro-batching records together. Since every record comes by itself, there is no for-each.
Simple record-by-record transformations can be done with a MapFunction, filtering out records with a FilterFunction. You can also implement a FlatMapFunction to do both in one step.

Once the stream is transformed and filtered, you can write it to HBase with a sink function.


2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Just to clarify these are the individual prices separated by ','. The below shows three price lines in the topic

UUID,                            Security,         Time,        Price
1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <[hidden email]> wrote:

Hi,

I have a Kafka topic that transmits 100 security prices ever 2 seconds. 

In Spark streaming I go through the RDD and walk through rows one by one and check prices
In prices are valuable I store them into an Hbase table

    val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    dstream.cache()
    dstream.foreachRDD
    { pricesRDD =>
          // Work on individual messages

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
            if (price > 90.0)

           {
               //save to Hbase table
           }
          }
     }

That works fine. 

In Flink I define my source as below

    val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = streamExecEnv
       .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))

Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of 

for(line <- pricesRDD.collect.toArray)
In flink?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



Reply | Threaded
Open this post in threaded view
|

Re: Working out through individual messages in Flink

Mich Talebzadeh
In reply to this post by Fabian Hueske-2
Hi Fabian,

Reading your notes above I have converted the table back to DataStream.

    val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
    tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 'timeissued, 'price)

           val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
           val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
           val timeissued = tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
           val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]

My intension is to create an Hbase sink as follows:

            // Save prices to Hbase table
             var p = new Put(new String(key).getBytes())
             p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),          new String(ticker).getBytes())
             p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new String(timeissued).getBytes())
             p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),           new String(priceToString).getBytes())
             p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),         new String(CURRENCY).getBytes())
             p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),         new String(1.toString).getBytes())
             p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),         new String(System.currentTimeMillis.toString).getBytes())
             HbaseTable.put(p)
             HbaseTable.flushCommits()

However, I don't seem to be able to get the correct values for the columns!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <[hidden email]> wrote:
A *Table*Source [1], is a special input connector for Flink's relational APIs (Table API and SQL) [2].
You can transform and filter with these APIs as well (it's probably even easier). In SQL this would be the SELECT and WHERE clauses of a query.

However, there is no *Table*Sink for HBase and you would need to convert the Table back to a DataStream [3].
That's not very difficult since the APIs are integrated with each other.

Best, Fabian


2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Thanks Fabian. That was very useful.

How about an operation like below?

         // create builder
         val KafkaTableSource = Kafka011JsonTableSource.builder()
         // set Kafka topic
            .forTopic(topicsValue)
         // set Kafka consumer properties
            .withKafkaProperties(properties)
         // set Table schema
        .withSchema(TableSchema.builder()
        .field("key", Types.STRING)
        .field("ticker", Types.STRING)
        .field("timeissued", Types.STRING)
        .field("price", Types.FLOAT)
        .build())

Will that be OK? 


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink processes streams record by record, instead of micro-batching records together. Since every record comes by itself, there is no for-each.
Simple record-by-record transformations can be done with a MapFunction, filtering out records with a FilterFunction. You can also implement a FlatMapFunction to do both in one step.

Once the stream is transformed and filtered, you can write it to HBase with a sink function.


2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Just to clarify these are the individual prices separated by ','. The below shows three price lines in the topic

UUID,                            Security,         Time,        Price
1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <[hidden email]> wrote:

Hi,

I have a Kafka topic that transmits 100 security prices ever 2 seconds. 

In Spark streaming I go through the RDD and walk through rows one by one and check prices
In prices are valuable I store them into an Hbase table

    val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    dstream.cache()
    dstream.foreachRDD
    { pricesRDD =>
          // Work on individual messages

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
            if (price > 90.0)

           {
               //save to Hbase table
           }
          }
     }

That works fine. 

In Flink I define my source as below

    val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = streamExecEnv
       .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))

Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of 

for(line <- pricesRDD.collect.toArray)
In flink?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



Reply | Threaded
Open this post in threaded view
|

Re: Working out through individual messages in Flink

Jörn Franke
Hi Mich,

Would it be possible to share the full source code ?
I am missing a call to streamExecEnvironment.execute

Best regards 

On 8. Aug 2018, at 00:02, Mich Talebzadeh <[hidden email]> wrote:

Hi Fabian,

Reading your notes above I have converted the table back to DataStream.

    val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
    tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 'timeissued, 'price)

           val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
           val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
           val timeissued = tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
           val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]

My intension is to create an Hbase sink as follows:

            // Save prices to Hbase table
             var p = new Put(new String(key).getBytes())
             p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),          new String(ticker).getBytes())
             p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new String(timeissued).getBytes())
             p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),           new String(priceToString).getBytes())
             p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),         new String(CURRENCY).getBytes())
             p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),         new String(1.toString).getBytes())
             p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),         new String(System.currentTimeMillis.toString).getBytes())
             HbaseTable.put(p)
             HbaseTable.flushCommits()

However, I don't seem to be able to get the correct values for the columns!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <[hidden email]> wrote:
A *Table*Source [1], is a special input connector for Flink's relational APIs (Table API and SQL) [2].
You can transform and filter with these APIs as well (it's probably even easier). In SQL this would be the SELECT and WHERE clauses of a query.

However, there is no *Table*Sink for HBase and you would need to convert the Table back to a DataStream [3].
That's not very difficult since the APIs are integrated with each other.

Best, Fabian


2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Thanks Fabian. That was very useful.

How about an operation like below?

         // create builder
         val KafkaTableSource = Kafka011JsonTableSource.builder()
         // set Kafka topic
            .forTopic(topicsValue)
         // set Kafka consumer properties
            .withKafkaProperties(properties)
         // set Table schema
        .withSchema(TableSchema.builder()
        .field("key", Types.STRING)
        .field("ticker", Types.STRING)
        .field("timeissued", Types.STRING)
        .field("price", Types.FLOAT)
        .build())

Will that be OK? 


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink processes streams record by record, instead of micro-batching records together. Since every record comes by itself, there is no for-each.
Simple record-by-record transformations can be done with a MapFunction, filtering out records with a FilterFunction. You can also implement a FlatMapFunction to do both in one step.

Once the stream is transformed and filtered, you can write it to HBase with a sink function.


2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Just to clarify these are the individual prices separated by ','. The below shows three price lines in the topic

UUID,                            Security,         Time,        Price
1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <[hidden email]> wrote:

Hi,

I have a Kafka topic that transmits 100 security prices ever 2 seconds. 

In Spark streaming I go through the RDD and walk through rows one by one and check prices
In prices are valuable I store them into an Hbase table

    val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    dstream.cache()
    dstream.foreachRDD
    { pricesRDD =>
          // Work on individual messages

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
            if (price > 90.0)

           {
               //save to Hbase table
           }
          }
     }

That works fine. 

In Flink I define my source as below

    val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = streamExecEnv
       .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))

Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of 

for(line <- pricesRDD.collect.toArray)
In flink?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



Reply | Threaded
Open this post in threaded view
|

Re: Working out through individual messages in Flink

Jörn Franke
(At the end of your code)

On 8. Aug 2018, at 00:29, Jörn Franke <[hidden email]> wrote:

Hi Mich,

Would it be possible to share the full source code ?
I am missing a call to streamExecEnvironment.execute

Best regards 

On 8. Aug 2018, at 00:02, Mich Talebzadeh <[hidden email]> wrote:

Hi Fabian,

Reading your notes above I have converted the table back to DataStream.

    val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
    tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 'timeissued, 'price)

           val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
           val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
           val timeissued = tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
           val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]

My intension is to create an Hbase sink as follows:

            // Save prices to Hbase table
             var p = new Put(new String(key).getBytes())
             p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),          new String(ticker).getBytes())
             p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new String(timeissued).getBytes())
             p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),           new String(priceToString).getBytes())
             p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),         new String(CURRENCY).getBytes())
             p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),         new String(1.toString).getBytes())
             p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),         new String(System.currentTimeMillis.toString).getBytes())
             HbaseTable.put(p)
             HbaseTable.flushCommits()

However, I don't seem to be able to get the correct values for the columns!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <[hidden email]> wrote:
A *Table*Source [1], is a special input connector for Flink's relational APIs (Table API and SQL) [2].
You can transform and filter with these APIs as well (it's probably even easier). In SQL this would be the SELECT and WHERE clauses of a query.

However, there is no *Table*Sink for HBase and you would need to convert the Table back to a DataStream [3].
That's not very difficult since the APIs are integrated with each other.

Best, Fabian


2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Thanks Fabian. That was very useful.

How about an operation like below?

         // create builder
         val KafkaTableSource = Kafka011JsonTableSource.builder()
         // set Kafka topic
            .forTopic(topicsValue)
         // set Kafka consumer properties
            .withKafkaProperties(properties)
         // set Table schema
        .withSchema(TableSchema.builder()
        .field("key", Types.STRING)
        .field("ticker", Types.STRING)
        .field("timeissued", Types.STRING)
        .field("price", Types.FLOAT)
        .build())

Will that be OK? 


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink processes streams record by record, instead of micro-batching records together. Since every record comes by itself, there is no for-each.
Simple record-by-record transformations can be done with a MapFunction, filtering out records with a FilterFunction. You can also implement a FlatMapFunction to do both in one step.

Once the stream is transformed and filtered, you can write it to HBase with a sink function.


2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Just to clarify these are the individual prices separated by ','. The below shows three price lines in the topic

UUID,                            Security,         Time,        Price
1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <[hidden email]> wrote:

Hi,

I have a Kafka topic that transmits 100 security prices ever 2 seconds. 

In Spark streaming I go through the RDD and walk through rows one by one and check prices
In prices are valuable I store them into an Hbase table

    val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    dstream.cache()
    dstream.foreachRDD
    { pricesRDD =>
          // Work on individual messages

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
            if (price > 90.0)

           {
               //save to Hbase table
           }
          }
     }

That works fine. 

In Flink I define my source as below

    val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = streamExecEnv
       .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))

Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of 

for(line <- pricesRDD.collect.toArray)
In flink?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



Reply | Threaded
Open this post in threaded view
|

Re: Working out through individual messages in Flink

Mich Talebzadeh
In reply to this post by Jörn Franke
Hi Jorn,

Thanks I uploaded the Scala code to my GitHub  --> md_streaming.scala


Regards,

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 7 Aug 2018 at 23:29, Jörn Franke <[hidden email]> wrote:
Hi Mich,

Would it be possible to share the full source code ?
I am missing a call to streamExecEnvironment.execute

Best regards 

On 8. Aug 2018, at 00:02, Mich Talebzadeh <[hidden email]> wrote:

Hi Fabian,

Reading your notes above I have converted the table back to DataStream.

    val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv)
    tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 'timeissued, 'price)

           val key = tableEnv.scan("priceTable").select('key).toDataStream[Row]
           val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row]
           val timeissued = tableEnv.scan("priceTable").select('timeissued).toDataStream[Row]
           val price = tableEnv.scan("priceTable").select('price).toDataStream[Row]

My intension is to create an Hbase sink as follows:

            // Save prices to Hbase table
             var p = new Put(new String(key).getBytes())
             p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(),          new String(ticker).getBytes())
             p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(),     new String(timeissued).getBytes())
             p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(),           new String(priceToString).getBytes())
             p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(),         new String(CURRENCY).getBytes())
             p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(),         new String(1.toString).getBytes())
             p.add("OPERATION".getBytes(), "OP_TIME".getBytes(),         new String(System.currentTimeMillis.toString).getBytes())
             HbaseTable.put(p)
             HbaseTable.flushCommits()

However, I don't seem to be able to get the correct values for the columns!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <[hidden email]> wrote:
A *Table*Source [1], is a special input connector for Flink's relational APIs (Table API and SQL) [2].
You can transform and filter with these APIs as well (it's probably even easier). In SQL this would be the SELECT and WHERE clauses of a query.

However, there is no *Table*Sink for HBase and you would need to convert the Table back to a DataStream [3].
That's not very difficult since the APIs are integrated with each other.

Best, Fabian


2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Thanks Fabian. That was very useful.

How about an operation like below?

         // create builder
         val KafkaTableSource = Kafka011JsonTableSource.builder()
         // set Kafka topic
            .forTopic(topicsValue)
         // set Kafka consumer properties
            .withKafkaProperties(properties)
         // set Table schema
        .withSchema(TableSchema.builder()
        .field("key", Types.STRING)
        .field("ticker", Types.STRING)
        .field("timeissued", Types.STRING)
        .field("price", Types.FLOAT)
        .build())

Will that be OK? 


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <[hidden email]> wrote:
Hi,

Flink processes streams record by record, instead of micro-batching records together. Since every record comes by itself, there is no for-each.
Simple record-by-record transformations can be done with a MapFunction, filtering out records with a FilterFunction. You can also implement a FlatMapFunction to do both in one step.

Once the stream is transformed and filtered, you can write it to HBase with a sink function.


2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <[hidden email]>:
Just to clarify these are the individual prices separated by ','. The below shows three price lines in the topic

UUID,                            Security,         Time,        Price
1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88
8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94
81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33


Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <[hidden email]> wrote:

Hi,

I have a Kafka topic that transmits 100 security prices ever 2 seconds. 

In Spark streaming I go through the RDD and walk through rows one by one and check prices
In prices are valuable I store them into an Hbase table

    val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    dstream.cache()
    dstream.foreachRDD
    { pricesRDD =>
          // Work on individual messages

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
            if (price > 90.0)

           {
               //save to Hbase table
           }
          }
     }

That works fine. 

In Flink I define my source as below

    val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val stream = streamExecEnv
       .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties))

Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of 

for(line <- pricesRDD.collect.toArray)
In flink?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.