Should I use a Sink or Connector? Or Both?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Should I use a Sink or Connector? Or Both?

Castro, Fernando C.

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)

 

I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()

 

My objective is to change #3 so I’m upserting into an Elasticsearch index (see https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors for my complete code)

 

I’ve been using the template for the Elasticsearch connector https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

tableEnvironment

  .connect(...)

  .withFormat(...)

  .withSchema(...)

  .inAppendMode()

  .createTemporaryTable("MyTable")

 

By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink (https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink) instead? Or both?

 

I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).

 

Hoping somebody could clarify what I’m missing? Thank you in advance!

 

Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply | Threaded
Open this post in threaded view
|

Re: Should I use a Sink or Connector? Or Both?

John Smith
The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <[hidden email]> wrote:

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)

 

I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()

 

My objective is to change #3 so I’m upserting into an Elasticsearch index (see https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors for my complete code)

 

I’ve been using the template for the Elasticsearch connector https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

tableEnvironment

  .connect(...)

  .withFormat(...)

  .withSchema(...)

  .inAppendMode()

  .createTemporaryTable("MyTable")

 

By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink (https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink) instead? Or both?

 

I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).

 

Hoping somebody could clarify what I’m missing? Thank you in advance!

 

Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply | Threaded
Open this post in threaded view
|

Re: Should I use a Sink or Connector? Or Both?

Jark Wu-3
John is right.

Could you provide more detailed code? So that we can help to investigate.

Best,
Jark

On Wed, 4 Mar 2020 at 06:20, John Smith <[hidden email]> wrote:
The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <[hidden email]> wrote:

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)

 

I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()

 

My objective is to change #3 so I’m upserting into an Elasticsearch index (see https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors for my complete code)

 

I’ve been using the template for the Elasticsearch connector https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

tableEnvironment

  .connect(...)

  .withFormat(...)

  .withSchema(...)

  .inAppendMode()

  .createTemporaryTable("MyTable")

 

By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink (https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink) instead? Or both?

 

I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).

 

Hoping somebody could clarify what I’m missing? Thank you in advance!

 

Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply | Threaded
Open this post in threaded view
|

Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Castro, Fernando C.

Thank you guys. So I have no idea of why data is not being pushed to Elasticsearch…

 

My complete code is at https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors

Btw, for some reason I still need to pass .documentType to the Elasticsearch connection descriptor (getting it from org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t do types anymore.

 

In case you can’t access stackoverflow for some reason, here is the code below too:

/*
* This Scala source file was generated by the Gradle 'init' task.
*/
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
import org.apache.flink.types.Row

object Demo {
 
/**
   * MapFunction to generate Transfers POJOs from parsed CSV data.
   */
 
class TransfersMapper extends RichMapFunction[String, Transfers] {
   
private var formatter = null

   
@throws[Exception]
   
override def open(parameters: Configuration): Unit = {
     
super.open(parameters)
     
//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
   
}

   
@throws[Exception]
   
override def map(csvLine: String): Transfers = {
     
//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
     
var splitCsv = csvLine.stripLineEnd.split(",")

     
val arrLength = splitCsv.length
     
val i = 0
     
if (arrLength != 13) {
       
for (i <- arrLength + 1 to 13) {
         
if (i == 13) {
            splitCsv = splitCsv :+
"0.0"
         
} else {
            splitCsv = splitCsv :+
""
         
}
        }
      }
     
var trans = new Transfers()
      trans.
rowId = splitCsv(0)
      trans.
subjectId = splitCsv(1)
      trans.
hadmId = splitCsv(2)
      trans.
icuStayId = splitCsv(3)
      trans.
dbSource = splitCsv(4)
      trans.
eventType = splitCsv(5)
      trans.
prev_careUnit = splitCsv(6)
      trans.
curr_careUnit = splitCsv(7)
      trans.
prev_wardId = splitCsv(8)
      trans.
curr_wardId = splitCsv(9)
      trans.
inTime = splitCsv(10)
      trans.
outTime = splitCsv(11)
      trans.
los = splitCsv(12).toDouble

     
return trans
    }
  }

 
def main(args: Array[String]) {
   
// Create streaming execution environment
   
val env = StreamExecutionEnvironment.getExecutionEnvironment
   
env.setParallelism(
1)

   
// Set properties per KafkaConsumer API
   
val properties = new Properties()
    properties.setProperty(
"bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty(
"group.id", "test")

   
// Add Kafka source to environment
   
val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
   
// Read from beginning of topic
   
myKConsumer.setStartFromEarliest()

   
val streamSource = env
      .addSource(myKConsumer)

   
// Transform CSV into a Transfers object
   
val streamTransfers = streamSource.map(new TransfersMapper())

   
// create a TableEnvironment
   
val tEnv = StreamTableEnvironment.create(env)

   
// register a Table
   
val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView(
"transfers", tblTransfers)

    tEnv.connect(
     
new Elasticsearch()
        .version(
"7")
        .host(
"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
       
.index("transfers-sum")
        .documentType(
"_doc") // not sure why this is still needed for ES7
        .keyNullLiteral(
"n/a")
    )
      .withFormat(
new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
      .withSchema(
new Schema()
        .field(
"curr_careUnit", DataTypes.STRING())
        .field(
"sum", DataTypes.DOUBLE())
      )
      .inUpsertMode()
      .createTemporaryTable(
"transfersSum")

   
val result = tEnv.sqlQuery(
     
"""
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |"""
.stripMargin)

    result.insertInto(
"transfersSum")

    tEnv.toRetractStream[Row](result).print()
//Just to see if something is actually happening (and it is)

    
env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}

 

 

Thank you,

Fernando 

 

 

From: Jark Wu <[hidden email]>
Date: Tuesday, March 3, 2020 at 8:51 PM
To: John Smith <[hidden email]>
Cc: "Castro, Fernando C. [US-US]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

 

John is right.

 

Could you provide more detailed code? So that we can help to investigate.

 

Best,

Jark

 

On Wed, 4 Mar 2020 at 06:20, John Smith <[hidden email]> wrote:

The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.

 

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <[hidden email]> wrote:

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)

 

I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()

 

My objective is to change #3 so I’m upserting into an Elasticsearch index (see https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors for my complete code)

 

I’ve been using the template for the Elasticsearch connector https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

tableEnvironment

  .connect(...)

  .withFormat(...)

  .withSchema(...)

  .inAppendMode()

  .createTemporaryTable("MyTable")

 

By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink (https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink) instead? Or both?

 

I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).

 

Hoping somebody could clarify what I’m missing? Thank you in advance!

 

Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply | Threaded
Open this post in threaded view
|

Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Arvid Heise-3
Hi Fernando,

How much data are you trying to write? If you just use single messages for testing, it could be that the default bulk settings are not working well.

If so, could you please adjust the following settings and report back?
public enum SinkOption {
BULK_FLUSH_MAX_ACTIONS,
BULK_FLUSH_MAX_SIZE,
BULK_FLUSH_INTERVAL
}

On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. <[hidden email]> wrote:

Thank you guys. So I have no idea of why data is not being pushed to Elasticsearch…

 

My complete code is at https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors

Btw, for some reason I still need to pass .documentType to the Elasticsearch connection descriptor (getting it from org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t do types anymore.

 

In case you can’t access stackoverflow for some reason, here is the code below too:

/*
* This Scala source file was generated by the Gradle 'init' task.
*/
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
import org.apache.flink.types.Row

object Demo {
 
/**
   * MapFunction to generate Transfers POJOs from parsed CSV data.
   */
 
class TransfersMapper extends RichMapFunction[String, Transfers] {
   
private var formatter = null

   
@throws[Exception]
   
override def open(parameters: Configuration): Unit = {
     
super.open(parameters)
     
//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
   
}

   
@throws[Exception]
   
override def map(csvLine: String): Transfers = {
     
//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
     
var splitCsv = csvLine.stripLineEnd.split(",")

     
val arrLength = splitCsv.length
     
val i = 0
     
if (arrLength != 13) {
       
for (i <- arrLength + 1 to 13) {
         
if (i == 13) {
            splitCsv = splitCsv :+
"0.0"
         
} else {
            splitCsv = splitCsv :+
""
         
}
        }
      }
     
var trans = new Transfers()
      trans.
rowId = splitCsv(0)
      trans.
subjectId = splitCsv(1)
      trans.
hadmId = splitCsv(2)
      trans.
icuStayId = splitCsv(3)
      trans.
dbSource = splitCsv(4)
      trans.
eventType = splitCsv(5)
      trans.
prev_careUnit = splitCsv(6)
      trans.
curr_careUnit = splitCsv(7)
      trans.
prev_wardId = splitCsv(8)
      trans.
curr_wardId = splitCsv(9)
      trans.
inTime = splitCsv(10)
      trans.
outTime = splitCsv(11)
      trans.
los = splitCsv(12).toDouble

     
return trans
    }
  }

 
def main(args: Array[String]) {
   
// Create streaming execution environment
   
val env = StreamExecutionEnvironment.getExecutionEnvironment
   
env.setParallelism(
1)

   
// Set properties per KafkaConsumer API
   
val properties = new Properties()
    properties.setProperty(
"bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty(
"group.id", "test")

   
// Add Kafka source to environment
   
val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
   
// Read from beginning of topic
   
myKConsumer.setStartFromEarliest()

   
val streamSource = env
      .addSource(myKConsumer)

   
// Transform CSV into a Transfers object
   
val streamTransfers = streamSource.map(new TransfersMapper())

   
// create a TableEnvironment
   
val tEnv = StreamTableEnvironment.create(env)

   
// register a Table
   
val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView(
"transfers", tblTransfers)

    tEnv.connect(
     
new Elasticsearch()
        .version(
"7")
        .host(
"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
       
.index("transfers-sum")
        .documentType(
"_doc") // not sure why this is still needed for ES7
        .keyNullLiteral(
"n/a")
    )
      .withFormat(
new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
      .withSchema(
new Schema()
        .field(
"curr_careUnit", DataTypes.STRING())
        .field(
"sum", DataTypes.DOUBLE())
      )
      .inUpsertMode()
      .createTemporaryTable(
"transfersSum")

   
val result = tEnv.sqlQuery(
     
"""
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |"""
.stripMargin)

    result.insertInto(
"transfersSum")

    tEnv.toRetractStream[Row](result).print()
//Just to see if something is actually happening (and it is)

    
env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}

 

 

Thank you,

Fernando 

 

 

From: Jark Wu <[hidden email]>
Date: Tuesday, March 3, 2020 at 8:51 PM
To: John Smith <[hidden email]>
Cc: "Castro, Fernando C. [US-US]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

 

John is right.

 

Could you provide more detailed code? So that we can help to investigate.

 

Best,

Jark

 

On Wed, 4 Mar 2020 at 06:20, John Smith <[hidden email]> wrote:

The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.

 

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <[hidden email]> wrote:

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)

 

I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()

 

My objective is to change #3 so I’m upserting into an Elasticsearch index (see https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors for my complete code)

 

I’ve been using the template for the Elasticsearch connector https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

tableEnvironment

  .connect(...)

  .withFormat(...)

  .withSchema(...)

  .inAppendMode()

  .createTemporaryTable("MyTable")

 

By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink (https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink) instead? Or both?

 

I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).

 

Hoping somebody could clarify what I’m missing? Thank you in advance!

 

Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply | Threaded
Open this post in threaded view
|

Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Castro, Fernando C.

Arvid, thank you that was it!

After setting these properties to my Elasticsearch connector, I was able to see the records upserting into ES!


.bulkFlushMaxActions(
2)
.bulkFlushInterval(
1000L)

 

Thank you,

Fernando 

 

 

From: Arvid Heise <[hidden email]>
Date: Thursday, March 5, 2020 at 2:27 AM
To: "Castro, Fernando C. [US-US]" <[hidden email]>
Cc: Jark Wu <[hidden email]>, John Smith <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

 

Hi Fernando,

 

How much data are you trying to write? If you just use single messages for testing, it could be that the default bulk settings are not working well.

 

If so, could you please adjust the following settings and report back?

public enum SinkOption {
  
BULK_FLUSH_MAX_ACTIONS,
  
BULK_FLUSH_MAX_SIZE,
  
BULK_FLUSH_INTERVAL
}

 

On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. <[hidden email]> wrote:

Thank you guys. So I have no idea of why data is not being pushed to Elasticsearch…

 

My complete code is at https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors

Btw, for some reason I still need to pass .documentType to the Elasticsearch connection descriptor (getting it from org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t do types anymore.

 

In case you can’t access stackoverflow for some reason, here is the code below too:

/*
* This Scala source file was generated by the Gradle 'init' task.
*/
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
import org.apache.flink.types.Row

object Demo {
 
/**
   * MapFunction to generate Transfers POJOs from parsed CSV data.
   */
 
class TransfersMapper extends RichMapFunction[String, Transfers] {
   
private var formatter = null

   
@throws[Exception]
   
override def open(parameters: Configuration): Unit = {
     
super.open(parameters)
     
//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
   
}

   
@throws[Exception]
   
override def map(csvLine: String): Transfers = {
     
//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
     
var splitCsv = csvLine.stripLineEnd.split(",")

     
val arrLength = splitCsv.length
     
val i = 0
     
if (arrLength != 13) {
       
for (i <- arrLength + 1 to 13) {
         
if (i == 13) {
            splitCsv = splitCsv :+
"0.0"
         
} else {
            splitCsv = splitCsv :+
""
         
}
        }
      }
     
var trans = new Transfers()
      trans.
rowId = splitCsv(0)
      trans.
subjectId = splitCsv(1)
      trans.
hadmId = splitCsv(2)
      trans.
icuStayId = splitCsv(3)
      trans.
dbSource = splitCsv(4)
      trans.
eventType = splitCsv(5)
      trans.
prev_careUnit = splitCsv(6)
      trans.
curr_careUnit = splitCsv(7)
      trans.
prev_wardId = splitCsv(8)
      trans.
curr_wardId = splitCsv(9)
      trans.
inTime = splitCsv(10)
      trans.
outTime = splitCsv(11)
      trans.
los = splitCsv(12).toDouble

     
return trans
    }
  }

 
def main(args: Array[String]) {
   
// Create streaming execution environment
   
val env = StreamExecutionEnvironment.getExecutionEnvironment
   
env.setParallelism(
1)

   
// Set properties per KafkaConsumer API
   
val properties = new Properties()
    properties.setProperty(
"bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty(
"group.id", "test")

   
// Add Kafka source to environment
   
val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
   
// Read from beginning of topic
   
myKConsumer.setStartFromEarliest()

   
val streamSource = env
      .addSource(myKConsumer)

   
// Transform CSV into a Transfers object
   
val streamTransfers = streamSource.map(new TransfersMapper())

   
// create a TableEnvironment
   
val tEnv = StreamTableEnvironment.create(env)

   
// register a Table
   
val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView(
"transfers", tblTransfers)

    tEnv.connect(
     
new Elasticsearch()
        .version(
"7")
        .host(
"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
       
.index("transfers-sum")
        .documentType(
"_doc") // not sure why this is still needed for ES7
        .keyNullLiteral(
"n/a")
    )
      .withFormat(
new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
      .withSchema(
new Schema()
        .field(
"curr_careUnit", DataTypes.STRING())
        .field(
"sum", DataTypes.DOUBLE())
      )
      .inUpsertMode()
      .createTemporaryTable(
"transfersSum")

   
val result = tEnv.sqlQuery(
     
"""
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |"""
.stripMargin)

    result.insertInto(
"transfersSum")

    tEnv.toRetractStream[Row](result).print()
//Just to see if something is actually happening (and it is)

    
env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}

 

 

Thank you,

Fernando 

 

 

From: Jark Wu <[hidden email]>
Date: Tuesday, March 3, 2020 at 8:51 PM
To: John Smith <[hidden email]>
Cc: "Castro, Fernando C. [US-US]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

 

John is right.

 

Could you provide more detailed code? So that we can help to investigate.

 

Best,

Jark

 

On Wed, 4 Mar 2020 at 06:20, John Smith <[hidden email]> wrote:

The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.

 

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <[hidden email]> wrote:

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)

 

I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()

 

My objective is to change #3 so I’m upserting into an Elasticsearch index (see https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors for my complete code)

 

I’ve been using the template for the Elasticsearch connector https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

tableEnvironment

  .connect(...)

  .withFormat(...)

  .withSchema(...)

  .inAppendMode()

  .createTemporaryTable("MyTable")

 

By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink (https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink) instead? Or both?

 

I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).

 

Hoping somebody could clarify what I’m missing? Thank you in advance!

 

Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply | Threaded
Open this post in threaded view
|

Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Jark Wu-3
Hi Fernando,

Thanks for reporting back. 
From my point of view, this is a short-comming of current elasticsearch connector, i.e. out-of-box doesn't work.
I created FLINK-16495 [1] to improve this to have a default flush interval. 

Best,
Jark


On Sat, 7 Mar 2020 at 00:20, Castro, Fernando C. <[hidden email]> wrote:

Arvid, thank you that was it!

After setting these properties to my Elasticsearch connector, I was able to see the records upserting into ES!


.bulkFlushMaxActions(
2)
.bulkFlushInterval(
1000L)

 

Thank you,

Fernando 

 

 

From: Arvid Heise <[hidden email]>
Date: Thursday, March 5, 2020 at 2:27 AM
To: "Castro, Fernando C. [US-US]" <[hidden email]>
Cc: Jark Wu <[hidden email]>, John Smith <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

 

Hi Fernando,

 

How much data are you trying to write? If you just use single messages for testing, it could be that the default bulk settings are not working well.

 

If so, could you please adjust the following settings and report back?

public enum SinkOption {
  
BULK_FLUSH_MAX_ACTIONS,
  
BULK_FLUSH_MAX_SIZE,
  
BULK_FLUSH_INTERVAL
}

 

On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. <[hidden email]> wrote:

Thank you guys. So I have no idea of why data is not being pushed to Elasticsearch…

 

My complete code is at https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors

Btw, for some reason I still need to pass .documentType to the Elasticsearch connection descriptor (getting it from org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t do types anymore.

 

In case you can’t access stackoverflow for some reason, here is the code below too:

/*
* This Scala source file was generated by the Gradle 'init' task.
*/
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
import org.apache.flink.types.Row

object Demo {
 
/**
   * MapFunction to generate Transfers POJOs from parsed CSV data.
   */
 
class TransfersMapper extends RichMapFunction[String, Transfers] {
   
private var formatter = null

   
@throws[Exception]
   
override def open(parameters: Configuration): Unit = {
     
super.open(parameters)
     
//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
   
}

   
@throws[Exception]
   
override def map(csvLine: String): Transfers = {
     
//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
     
var splitCsv = csvLine.stripLineEnd.split(",")

     
val arrLength = splitCsv.length
     
val i = 0
     
if (arrLength != 13) {
       
for (i <- arrLength + 1 to 13) {
         
if (i == 13) {
            splitCsv = splitCsv :+
"0.0"
         
} else {
            splitCsv = splitCsv :+
""
         
}
        }
      }
     
var trans = new Transfers()
      trans.
rowId = splitCsv(0)
      trans.
subjectId = splitCsv(1)
      trans.
hadmId = splitCsv(2)
      trans.
icuStayId = splitCsv(3)
      trans.
dbSource = splitCsv(4)
      trans.
eventType = splitCsv(5)
      trans.
prev_careUnit = splitCsv(6)
      trans.
curr_careUnit = splitCsv(7)
      trans.
prev_wardId = splitCsv(8)
      trans.
curr_wardId = splitCsv(9)
      trans.
inTime = splitCsv(10)
      trans.
outTime = splitCsv(11)
      trans.
los = splitCsv(12).toDouble

     
return trans
    }
  }

 
def main(args: Array[String]) {
   
// Create streaming execution environment
   
val env = StreamExecutionEnvironment.getExecutionEnvironment
   
env.setParallelism(
1)

   
// Set properties per KafkaConsumer API
   
val properties = new Properties()
    properties.setProperty(
"bootstrap.servers", "kafka.kafka:9092")
    properties.setProperty(
"group.id", "test")

   
// Add Kafka source to environment
   
val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
   
// Read from beginning of topic
   
myKConsumer.setStartFromEarliest()

   
val streamSource = env
      .addSource(myKConsumer)

   
// Transform CSV into a Transfers object
   
val streamTransfers = streamSource.map(new TransfersMapper())

   
// create a TableEnvironment
   
val tEnv = StreamTableEnvironment.create(env)

   
// register a Table
   
val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
    tEnv.createTemporaryView(
"transfers", tblTransfers)

    tEnv.connect(
     
new Elasticsearch()
        .version(
"7")
        .host(
"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
       
.index("transfers-sum")
        .documentType(
"_doc") // not sure why this is still needed for ES7
        .keyNullLiteral(
"n/a")
    )
      .withFormat(
new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
      .withSchema(
new Schema()
        .field(
"curr_careUnit", DataTypes.STRING())
        .field(
"sum", DataTypes.DOUBLE())
      )
      .inUpsertMode()
      .createTemporaryTable(
"transfersSum")

   
val result = tEnv.sqlQuery(
     
"""
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit
        |"""
.stripMargin)

    result.insertInto(
"transfersSum")

    tEnv.toRetractStream[Row](result).print()
//Just to see if something is actually happening (and it is)

    
env.execute("Flink Streaming Demo Dump to Elasticsearch")
  }
}

 

 

Thank you,

Fernando 

 

 

From: Jark Wu <[hidden email]>
Date: Tuesday, March 3, 2020 at 8:51 PM
To: John Smith <[hidden email]>
Cc: "Castro, Fernando C. [US-US]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

 

John is right.

 

Could you provide more detailed code? So that we can help to investigate.

 

Best,

Jark

 

On Wed, 4 Mar 2020 at 06:20, John Smith <[hidden email]> wrote:

The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.

 

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <[hidden email]> wrote:

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)

 

I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()

 

My objective is to change #3 so I’m upserting into an Elasticsearch index (see https://stackoverflow.com/questions/60512064/flink-is-not-adding-any-data-to-elasticsearch-but-no-errors for my complete code)

 

I’ve been using the template for the Elasticsearch connector https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector

tableEnvironment

  .connect(...)

  .withFormat(...)

  .withSchema(...)

  .inAppendMode()

  .createTemporaryTable("MyTable")

 

By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink (https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/elasticsearch.html#elasticsearch-sink) instead? Or both?

 

I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).

 

Hoping somebody could clarify what I’m missing? Thank you in advance!

 

Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10