Hi, I have a Kafka topic that transmits 100 security prices ever 2 seconds. In Spark streaming I go through the RDD and walk through rows one by one and check prices In prices are valuable I store them into an Hbase table val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics) dstream.cache() dstream.foreachRDD { pricesRDD => // Work on individual messages for(line <- pricesRDD.collect.toArray) { var key = line._2.split(',').view(0).toString var ticker = line._2.split(',').view(1).toString var timeissued = line._2.split(',').view(2).toString var price = line._2.split(',').view(3).toFloat val priceToString = line._2.split(',').view(3) if (price > 90.0) { //save to Hbase table } } } That works fine. In Flink I define my source as below val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment streamExecEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = streamExecEnv .addSource(new FlinkKafkaConsumer011[String](topicsValue, new SimpleStringSchema(), properties)) Is there anyway I can perform similar operation in Flink? I need to go through every topic load sent and look at the individual rows/ For example what is the equivalent of for(line <- pricesRDD.collect.toArray) In flink? Thanks Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
|
Hi, Mich: You can add write a sink function for that. On Mon, Jul 30, 2018 at 2:58 PM Mich Talebzadeh <[hidden email]> wrote:
Liu, Renjie Software Engineer, MVAD |
In reply to this post by Mich Talebzadeh
Just to clarify these are the individual prices separated by ','. The below shows three price lines in the topic UUID, Security, Time, Price 1230a9a9-dc57-40e4-a000-6ea6989c754a, MRW, 2018-07-28T20:38:43, 241.88 8143c6ca-109f-484f-be0e-9d0f5ca32f07,SAP,2018-07-28T20:38:44,56.94 81a54ff8-6ac8-470a-a522-51737d685264,VOD,2018-07-28T20:38:44,219.33 Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Mon, 30 Jul 2018 at 07:58, Mich Talebzadeh <[hidden email]> wrote:
|
Hi, Flink processes streams record by record, instead of micro-batching records together. Since every record comes by itself, there is no for-each. Simple record-by-record transformations can be done with a MapFunction, filtering out records with a FilterFunction. You can also implement a FlatMapFunction to do both in one step. Once the stream is transformed and filtered, you can write it to HBase with a sink function. 2018-07-30 10:03 GMT+02:00 Mich Talebzadeh <[hidden email]>:
|
Thanks Fabian. That was very useful. How about an operation like below? // create builder val KafkaTableSource = Kafka011JsonTableSource.builder() // set Kafka topic .forTopic(topicsValue) // set Kafka consumer properties .withKafkaProperties(properties) // set Table schema .withSchema(TableSchema.builder() .field("key", Types.STRING) .field("ticker", Types.STRING) .field("timeissued", Types.STRING) .field("price", Types.FLOAT) .build()) Will that be OK? Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Mon, 30 Jul 2018 at 09:19, Fabian Hueske <[hidden email]> wrote:
|
A *Table*Source [1], is a special input connector for Flink's relational APIs (Table API and SQL) [2]. You can transform and filter with these APIs as well (it's probably even easier). In SQL this would be the SELECT and WHERE clauses of a query. However, there is no *Table*Sink for HBase and you would need to convert the Table back to a DataStream [3]. That's not very difficult since the APIs are integrated with each other. Best, Fabian 2018-07-30 10:47 GMT+02:00 Mich Talebzadeh <[hidden email]>:
|
Thanks again. The Hbase connector works fine in Flink // Start Hbase table stuff val tableName = "MARKETDATAHBASESPEEDFLINK" val hbaseConf = HBaseConfiguration.create() // Connecting to remote Hbase hbaseConf.set("hbase.master", hbaseHost) hbaseConf.set("hbase.zookeeper.quorum",zookeeperHost) hbaseConf.set("hbase.zookeeper.property.clientPort",zooKeeperClientPort) hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName) // create this table with column family val admin = new HBaseAdmin(hbaseConf) if(!admin.isTableAvailable(tableName)) { println("Creating table " + tableName) val tableDesc = new HTableDescriptor(tableName) tableDesc.addFamily(new HColumnDescriptor("PRICE_INFO".getBytes())) tableDesc.addFamily(new HColumnDescriptor("OPERATION".getBytes())) admin.createTable(tableDesc) } else { println("Table " + tableName + " already exists!!") } val HbaseTable = new HTable(hbaseConf, tableName) // End Hbase table stuff So I just need to split every row into columns and put it into Hbase as follows: // Save prices to Hbase table var p = new Put(new String(key).getBytes()) //p.add("PRICE_INFO".getBytes(), "key".getBytes(), new String(ticker).getBytes()) p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(), new String(ticker).getBytes()) p.add("PRICE_INFO".getBytes(), "SSUED".getBytes(), new String(timeissued).getBytes()) p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(), new String(priceToString).getBytes()) p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(), new String(CURRENCY).getBytes()) p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(), new String(1.toString).getBytes()) p.add("OPERATION".getBytes(), "OP_TIME".getBytes(), new String(System.currentTimeMillis.toString).getBytes()) HbaseTable.put(p) HbaseTable.flushCommits() Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <[hidden email]> wrote:
|
In reply to this post by Fabian Hueske-2
Hi Fabian, Reading your notes above I have converted the table back to DataStream. val tableEnv = TableEnvironment.getTableEnvironment(streamExecEnv) tableEnv.registerDataStream("priceTable", splitStream, 'key, 'ticker, 'timeissued, 'price) val key = tableEnv.scan("priceTable").select('key).toDataStream[Row] val ticker = tableEnv.scan("priceTable").select('ticker).toDataStream[Row] val timeissued = tableEnv.scan("priceTable").select('timeissued).toDataStream[Row] val price = tableEnv.scan("priceTable").select('price).toDataStream[Row] My intension is to create an Hbase sink as follows: // Save prices to Hbase table var p = new Put(new String(key).getBytes()) p.add("PRICE_INFO".getBytes(), "TICKER".getBytes(), new String(ticker).getBytes()) p.add("PRICE_INFO".getBytes(), "ISSUED".getBytes(), new String(timeissued).getBytes()) p.add("PRICE_INFO".getBytes(), "PRICE".getBytes(), new String(priceToString).getBytes()) p.add("PRICE_INFO".getBytes(), "CURRENCY".getBytes(), new String(CURRENCY).getBytes()) p.add("OPERATION".getBytes(), "OP_TYPE".getBytes(), new String(1.toString).getBytes()) p.add("OPERATION".getBytes(), "OP_TIME".getBytes(), new String(System.currentTimeMillis.toString).getBytes()) HbaseTable.put(p) HbaseTable.flushCommits() However, I don't seem to be able to get the correct values for the columns! Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Mon, 30 Jul 2018 at 09:58, Fabian Hueske <[hidden email]> wrote:
|
Hi Mich, Would it be possible to share the full source code ? I am missing a call to streamExecEnvironment.execute Best regards
|
(At the end of your code)
|
In reply to this post by Jörn Franke
Hi Jorn, Thanks I uploaded the Scala code to my GitHub --> md_streaming.scala Regards, Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
On Tue, 7 Aug 2018 at 23:29, Jörn Franke <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |