Should I use a Sink or Connector? Or Both?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view

Should I use a Sink or Connector? Or Both?

Castro, Fernando C.

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)


I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()


My objective is to change #3 so I’m upserting into an Elasticsearch index (see for my complete code)


I’ve been using the template for the Elasticsearch connector








By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink ( instead? Or both?


I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).


Hoping somebody could clarify what I’m missing? Thank you in advance!


Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply | Threaded
Open this post in threaded view

Re: Should I use a Sink or Connector? Or Both?

John Smith
The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <[hidden email]> wrote:

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)


I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()


My objective is to change #3 so I’m upserting into an Elasticsearch index (see for my complete code)


I’ve been using the template for the Elasticsearch connector








By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink ( instead? Or both?


I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).


Hoping somebody could clarify what I’m missing? Thank you in advance!


Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply | Threaded
Open this post in threaded view

Re: Should I use a Sink or Connector? Or Both?

Jark Wu-3
John is right.

Could you provide more detailed code? So that we can help to investigate.


On Wed, 4 Mar 2020 at 06:20, John Smith <[hidden email]> wrote:
The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.

On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <[hidden email]> wrote:

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)


I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()


My objective is to change #3 so I’m upserting into an Elasticsearch index (see for my complete code)


I’ve been using the template for the Elasticsearch connector








By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink ( instead? Or both?


I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).


Hoping somebody could clarify what I’m missing? Thank you in advance!


Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply | Threaded
Open this post in threaded view

Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Castro, Fernando C.

Thank you guys. So I have no idea of why data is not being pushed to Elasticsearch…


My complete code is at

Btw, for some reason I still need to pass .documentType to the Elasticsearch connection descriptor (getting it from org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t do types anymore.


In case you can’t access stackoverflow for some reason, here is the code below too:

* This Scala source file was generated by the Gradle 'init' task.
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
import org.apache.flink.types.Row

object Demo {
   * MapFunction to generate Transfers POJOs from parsed CSV data.
class TransfersMapper extends RichMapFunction[String, Transfers] {
private var formatter = null

override def open(parameters: Configuration): Unit = {
//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")

override def map(csvLine: String): Transfers = {
//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
var splitCsv = csvLine.stripLineEnd.split(",")

val arrLength = splitCsv.length
val i = 0
if (arrLength != 13) {
for (i <- arrLength + 1 to 13) {
if (i == 13) {
            splitCsv = splitCsv :+
} else {
            splitCsv = splitCsv :+
var trans = new Transfers()
rowId = splitCsv(0)
subjectId = splitCsv(1)
hadmId = splitCsv(2)
icuStayId = splitCsv(3)
dbSource = splitCsv(4)
eventType = splitCsv(5)
prev_careUnit = splitCsv(6)
curr_careUnit = splitCsv(7)
prev_wardId = splitCsv(8)
curr_wardId = splitCsv(9)
inTime = splitCsv(10)
outTime = splitCsv(11)
los = splitCsv(12).toDouble

return trans

def main(args: Array[String]) {
// Create streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// Set properties per KafkaConsumer API
val properties = new Properties()
"bootstrap.servers", "kafka.kafka:9092")
"", "test")

// Add Kafka source to environment
val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
// Read from beginning of topic

val streamSource = env

// Transform CSV into a Transfers object
val streamTransfers = TransfersMapper())

// create a TableEnvironment
val tEnv = StreamTableEnvironment.create(env)

// register a Table
val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
"transfers", tblTransfers)

new Elasticsearch()
"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
"_doc") // not sure why this is still needed for ES7
new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
new Schema()
"curr_careUnit", DataTypes.STRING())
"sum", DataTypes.DOUBLE())

val result = tEnv.sqlQuery(
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit


//Just to see if something is actually happening (and it is)

env.execute("Flink Streaming Demo Dump to Elasticsearch")



Thank you,




From: Jark Wu <[hidden email]>
Date: Tuesday, March 3, 2020 at 8:51 PM
To: John Smith <[hidden email]>
Cc: "Castro, Fernando C. [US-US]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?


John is right.


Could you provide more detailed code? So that we can help to investigate.





On Wed, 4 Mar 2020 at 06:20, John Smith <[hidden email]> wrote:

The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.


On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <[hidden email]> wrote:

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)


I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()


My objective is to change #3 so I’m upserting into an Elasticsearch index (see for my complete code)


I’ve been using the template for the Elasticsearch connector








By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink ( instead? Or both?


I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).


Hoping somebody could clarify what I’m missing? Thank you in advance!


Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply | Threaded
Open this post in threaded view

Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Arvid Heise-3
Hi Fernando,

How much data are you trying to write? If you just use single messages for testing, it could be that the default bulk settings are not working well.

If so, could you please adjust the following settings and report back?
public enum SinkOption {

On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. <[hidden email]> wrote:

Thank you guys. So I have no idea of why data is not being pushed to Elasticsearch…


My complete code is at

Btw, for some reason I still need to pass .documentType to the Elasticsearch connection descriptor (getting it from org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t do types anymore.


In case you can’t access stackoverflow for some reason, here is the code below too:

* This Scala source file was generated by the Gradle 'init' task.
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
import org.apache.flink.types.Row

object Demo {
   * MapFunction to generate Transfers POJOs from parsed CSV data.
class TransfersMapper extends RichMapFunction[String, Transfers] {
private var formatter = null

override def open(parameters: Configuration): Unit = {
//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")

override def map(csvLine: String): Transfers = {
//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
var splitCsv = csvLine.stripLineEnd.split(",")

val arrLength = splitCsv.length
val i = 0
if (arrLength != 13) {
for (i <- arrLength + 1 to 13) {
if (i == 13) {
            splitCsv = splitCsv :+
} else {
            splitCsv = splitCsv :+
var trans = new Transfers()
rowId = splitCsv(0)
subjectId = splitCsv(1)
hadmId = splitCsv(2)
icuStayId = splitCsv(3)
dbSource = splitCsv(4)
eventType = splitCsv(5)
prev_careUnit = splitCsv(6)
curr_careUnit = splitCsv(7)
prev_wardId = splitCsv(8)
curr_wardId = splitCsv(9)
inTime = splitCsv(10)
outTime = splitCsv(11)
los = splitCsv(12).toDouble

return trans

def main(args: Array[String]) {
// Create streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// Set properties per KafkaConsumer API
val properties = new Properties()
"bootstrap.servers", "kafka.kafka:9092")
"", "test")

// Add Kafka source to environment
val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
// Read from beginning of topic

val streamSource = env

// Transform CSV into a Transfers object
val streamTransfers = TransfersMapper())

// create a TableEnvironment
val tEnv = StreamTableEnvironment.create(env)

// register a Table
val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
"transfers", tblTransfers)

new Elasticsearch()
"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
"_doc") // not sure why this is still needed for ES7
new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
new Schema()
"curr_careUnit", DataTypes.STRING())
"sum", DataTypes.DOUBLE())

val result = tEnv.sqlQuery(
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit


//Just to see if something is actually happening (and it is)

env.execute("Flink Streaming Demo Dump to Elasticsearch")



Thank you,




From: Jark Wu <[hidden email]>
Date: Tuesday, March 3, 2020 at 8:51 PM
To: John Smith <[hidden email]>
Cc: "Castro, Fernando C. [US-US]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?


John is right.


Could you provide more detailed code? So that we can help to investigate.





On Wed, 4 Mar 2020 at 06:20, John Smith <[hidden email]> wrote:

The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.


On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <[hidden email]> wrote:

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)


I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()


My objective is to change #3 so I’m upserting into an Elasticsearch index (see for my complete code)


I’ve been using the template for the Elasticsearch connector








By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink ( instead? Or both?


I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).


Hoping somebody could clarify what I’m missing? Thank you in advance!


Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply | Threaded
Open this post in threaded view

Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Castro, Fernando C.

Arvid, thank you that was it!

After setting these properties to my Elasticsearch connector, I was able to see the records upserting into ES!



Thank you,




From: Arvid Heise <[hidden email]>
Date: Thursday, March 5, 2020 at 2:27 AM
To: "Castro, Fernando C. [US-US]" <[hidden email]>
Cc: Jark Wu <[hidden email]>, John Smith <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?


Hi Fernando,


How much data are you trying to write? If you just use single messages for testing, it could be that the default bulk settings are not working well.


If so, could you please adjust the following settings and report back?

public enum SinkOption {


On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. <[hidden email]> wrote:

Thank you guys. So I have no idea of why data is not being pushed to Elasticsearch…


My complete code is at

Btw, for some reason I still need to pass .documentType to the Elasticsearch connection descriptor (getting it from org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t do types anymore.


In case you can’t access stackoverflow for some reason, here is the code below too:

* This Scala source file was generated by the Gradle 'init' task.
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
import org.apache.flink.types.Row

object Demo {
   * MapFunction to generate Transfers POJOs from parsed CSV data.
class TransfersMapper extends RichMapFunction[String, Transfers] {
private var formatter = null

override def open(parameters: Configuration): Unit = {
//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")

override def map(csvLine: String): Transfers = {
//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
var splitCsv = csvLine.stripLineEnd.split(",")

val arrLength = splitCsv.length
val i = 0
if (arrLength != 13) {
for (i <- arrLength + 1 to 13) {
if (i == 13) {
            splitCsv = splitCsv :+
} else {
            splitCsv = splitCsv :+
var trans = new Transfers()
rowId = splitCsv(0)
subjectId = splitCsv(1)
hadmId = splitCsv(2)
icuStayId = splitCsv(3)
dbSource = splitCsv(4)
eventType = splitCsv(5)
prev_careUnit = splitCsv(6)
curr_careUnit = splitCsv(7)
prev_wardId = splitCsv(8)
curr_wardId = splitCsv(9)
inTime = splitCsv(10)
outTime = splitCsv(11)
los = splitCsv(12).toDouble

return trans

def main(args: Array[String]) {
// Create streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// Set properties per KafkaConsumer API
val properties = new Properties()
"bootstrap.servers", "kafka.kafka:9092")
"", "test")

// Add Kafka source to environment
val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
// Read from beginning of topic

val streamSource = env

// Transform CSV into a Transfers object
val streamTransfers = TransfersMapper())

// create a TableEnvironment
val tEnv = StreamTableEnvironment.create(env)

// register a Table
val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
"transfers", tblTransfers)

new Elasticsearch()
"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
"_doc") // not sure why this is still needed for ES7
new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
new Schema()
"curr_careUnit", DataTypes.STRING())
"sum", DataTypes.DOUBLE())

val result = tEnv.sqlQuery(
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit


//Just to see if something is actually happening (and it is)

env.execute("Flink Streaming Demo Dump to Elasticsearch")



Thank you,




From: Jark Wu <[hidden email]>
Date: Tuesday, March 3, 2020 at 8:51 PM
To: John Smith <[hidden email]>
Cc: "Castro, Fernando C. [US-US]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?


John is right.


Could you provide more detailed code? So that we can help to investigate.





On Wed, 4 Mar 2020 at 06:20, John Smith <[hidden email]> wrote:

The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.


On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <[hidden email]> wrote:

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)


I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()


My objective is to change #3 so I’m upserting into an Elasticsearch index (see for my complete code)


I’ve been using the template for the Elasticsearch connector








By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink ( instead? Or both?


I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).


Hoping somebody could clarify what I’m missing? Thank you in advance!


Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10

Reply | Threaded
Open this post in threaded view

Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?

Jark Wu-3
Hi Fernando,

Thanks for reporting back. 
From my point of view, this is a short-comming of current elasticsearch connector, i.e. out-of-box doesn't work.
I created FLINK-16495 [1] to improve this to have a default flush interval. 


On Sat, 7 Mar 2020 at 00:20, Castro, Fernando C. <[hidden email]> wrote:

Arvid, thank you that was it!

After setting these properties to my Elasticsearch connector, I was able to see the records upserting into ES!



Thank you,




From: Arvid Heise <[hidden email]>
Date: Thursday, March 5, 2020 at 2:27 AM
To: "Castro, Fernando C. [US-US]" <[hidden email]>
Cc: Jark Wu <[hidden email]>, John Smith <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?


Hi Fernando,


How much data are you trying to write? If you just use single messages for testing, it could be that the default bulk settings are not working well.


If so, could you please adjust the following settings and report back?

public enum SinkOption {


On Wed, Mar 4, 2020 at 3:05 PM Castro, Fernando C. <[hidden email]> wrote:

Thank you guys. So I have no idea of why data is not being pushed to Elasticsearch…


My complete code is at

Btw, for some reason I still need to pass .documentType to the Elasticsearch connection descriptor (getting it from org.apache.flink.table.descriptors.Elasticsearch), when Elasticsearch 7 doesn’t do types anymore.


In case you can’t access stackoverflow for some reason, here is the code below too:

* This Scala source file was generated by the Gradle 'init' task.
package flinkNamePull

import java.time.LocalDateTime
import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.descriptors.{Elasticsearch, Json, Schema}
import org.apache.flink.types.Row

object Demo {
   * MapFunction to generate Transfers POJOs from parsed CSV data.
class TransfersMapper extends RichMapFunction[String, Transfers] {
private var formatter = null

override def open(parameters: Configuration): Unit = {
//formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")

override def map(csvLine: String): Transfers = {
//var splitCsv = csvLine.stripLineEnd.split("\n")(1).split(",")
var splitCsv = csvLine.stripLineEnd.split(",")

val arrLength = splitCsv.length
val i = 0
if (arrLength != 13) {
for (i <- arrLength + 1 to 13) {
if (i == 13) {
            splitCsv = splitCsv :+
} else {
            splitCsv = splitCsv :+
var trans = new Transfers()
rowId = splitCsv(0)
subjectId = splitCsv(1)
hadmId = splitCsv(2)
icuStayId = splitCsv(3)
dbSource = splitCsv(4)
eventType = splitCsv(5)
prev_careUnit = splitCsv(6)
curr_careUnit = splitCsv(7)
prev_wardId = splitCsv(8)
curr_wardId = splitCsv(9)
inTime = splitCsv(10)
outTime = splitCsv(11)
los = splitCsv(12).toDouble

return trans

def main(args: Array[String]) {
// Create streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// Set properties per KafkaConsumer API
val properties = new Properties()
"bootstrap.servers", "kafka.kafka:9092")
"", "test")

// Add Kafka source to environment
val myKConsumer = new FlinkKafkaConsumer010[String]("raw.data3", new SimpleStringSchema(), properties)
// Read from beginning of topic

val streamSource = env

// Transform CSV into a Transfers object
val streamTransfers = TransfersMapper())

// create a TableEnvironment
val tEnv = StreamTableEnvironment.create(env)

// register a Table
val tblTransfers: Table = tEnv.fromDataStream(streamTransfers)
"transfers", tblTransfers)

new Elasticsearch()
"elasticsearch-elasticsearch-coordinating-only.default.svc.cluster.local", 9200, "http")   // required: one or more Elasticsearch hosts to connect to
"_doc") // not sure why this is still needed for ES7
new Json().jsonSchema("{type: 'object', properties: {curr_careUnit: {type: 'string'}, sum: {type: 'number'}}}"))
new Schema()
"curr_careUnit", DataTypes.STRING())
"sum", DataTypes.DOUBLE())

val result = tEnv.sqlQuery(
        |SELECT curr_careUnit, sum(los)
        |FROM transfers
        |GROUP BY curr_careUnit


//Just to see if something is actually happening (and it is)

env.execute("Flink Streaming Demo Dump to Elasticsearch")



Thank you,




From: Jark Wu <[hidden email]>
Date: Tuesday, March 3, 2020 at 8:51 PM
To: John Smith <[hidden email]>
Cc: "Castro, Fernando C. [US-US]" <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: EXTERNAL: Re: Should I use a Sink or Connector? Or Both?


John is right.


Could you provide more detailed code? So that we can help to investigate.





On Wed, 4 Mar 2020 at 06:20, John Smith <[hidden email]> wrote:

The sink if for Streaming API, it looks like you are using SQL and tables. So you can use the connector to output the table result to Elastic. Unless you want to convert from table to stream first.


On Tue, 3 Mar 2020 at 16:25, Castro, Fernando C. <[hidden email]> wrote:

Hello folks! I’m new to Flink and data streaming in general, just initial FYI ;)


I’m currently doing this successfully:

1 - streaming data from Kafka in Flink

2 - aggregating the data with Flink’s sqlQuery API

3 - outputting the result of #2 into STDOUT via toRetreatStream()


My objective is to change #3 so I’m upserting into an Elasticsearch index (see for my complete code)


I’ve been using the template for the Elasticsearch connector








By I’m confused from seeing some old examples online. Should I be using the Elasticsearch Sink ( instead? Or both?


I’m having trouble with the current implementation where no data is outputting to Elasticsearch, but no error is being displayed in Flink (job status is RUNNING).


Hoping somebody could clarify what I’m missing? Thank you in advance!


Note: Running on Scala 2.11, Elasticsearch 7, Flink 1.10