LEGACY('STRUCTURED_TYPE' to pojo

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

LEGACY('STRUCTURED_TYPE' to pojo

Rex Fenley
Hello,

I keep running into trouble moving between DataStream and SQL with POJOs because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there any way to convert them back to POJOs in Flink when converting a SQL Table back to a DataStream?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: LEGACY('STRUCTURED_TYPE' to pojo

Rex Fenley
Correction, I'm using Scala case classes not strictly Java POJOs just to be clear.

On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <[hidden email]> wrote:
Hello,

I keep running into trouble moving between DataStream and SQL with POJOs because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there any way to convert them back to POJOs in Flink when converting a SQL Table back to a DataStream?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: LEGACY('STRUCTURED_TYPE' to pojo

Rex Fenley
Maybe this is related to this issue? https://issues.apache.org/jira/browse/FLINK-17683

On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <[hidden email]> wrote:
Correction, I'm using Scala case classes not strictly Java POJOs just to be clear.

On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <[hidden email]> wrote:
Hello,

I keep running into trouble moving between DataStream and SQL with POJOs because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there any way to convert them back to POJOs in Flink when converting a SQL Table back to a DataStream?

Thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: LEGACY('STRUCTURED_TYPE' to pojo

Aljoscha Krettek
@Timo: Is this sth that would work when using the new type stack? From
the message I'm assuming it's using the older type stack.

@Rex: Which Flink version are you using and could you maybe post the
code snipped that you use to do conversions?

Best,
Aljoscha

On 02.11.20 06:50, Rex Fenley wrote:

> Maybe this is related to this issue?
> https://issues.apache.org/jira/browse/FLINK-17683
>
> On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <[hidden email]> wrote:
>
>> Correction, I'm using Scala case classes not strictly Java POJOs just to
>> be clear.
>>
>> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <[hidden email]> wrote:
>>
>>> Hello,
>>>
>>> I keep running into trouble moving between DataStream and SQL with POJOs
>>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there any
>>> way to convert them back to POJOs in Flink when converting a SQL Table back
>>> to a DataStream?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: LEGACY('STRUCTURED_TYPE' to pojo

Rex Fenley
Flink 1.11.2 with Scala 2.12

Error:
[info] JobScalaTest:
[info] - dummy *** FAILED ***
[info]   org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink  do not match.
[info] Query schema: [user: BIGINT, product: ROW<`name` VARCHAR(2147483647), `id` BIGINT>, amount: INT]
[info] Sink schema: [user: BIGINT, product: LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem, rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'), amount: INT]
[info]   at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
[info]   at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
[info]   at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
[info]   at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
[info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
[info]   at scala.collection.Iterator.foreach$(Iterator.scala:943)
[info]   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
[info]   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
[info]   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
[info]   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

Code:
import com.remind.graph.people.PeopleJobScala

import org.scalatest.funsuite._
import org.scalatest.BeforeAndAfter

import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.streaming.util.TestStreamEnvironment
import org.apache.flink.table.runtime.util._
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.api.common.state.ListState
import org.apache.flink.runtime.state.FunctionInitializationContext
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.runtime.state.FunctionSnapshotContext
import org.apache.flink.types.Row

import java.io.Serializable;
import java.sql.Timestamp;
import java.text.SimpleDateFormat
import java.util.concurrent.atomic.AtomicInteger
import java.{util => ju}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try

case class Order(user: Long, product: ProductItem, amount: Int) {
def this() {
this(0, null, 0)
}

override def toString(): String = {
return "Order{" +
"user=" + user +
", product='" + product + '\'' +
", amount=" + amount +
'}';
}
}

case class ProductItem(name: String, id: Long) {
def this() {
this(null, 0)
}

override def toString(): String = {
return "Product{" +
"name='" + name + '\'' +
", id=" + id +
'}';
}
}

class JobScalaTest extends AnyFunSuite with BeforeAndAfter {
var env: StreamExecutionEnvironment = _
var tEnv: StreamTableEnvironment = _

before {
this.env = StreamExecutionEnvironment.getExecutionEnvironment
this.env.setParallelism(2)
this.env.getConfig.enableObjectReuse()
val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
this.tEnv = StreamTableEnvironment.create(env, setting)
}

after {
StreamTestSink.clear()
// TestValuesTableFactory.clearAllData()
}

def dateFrom(stringDate: String): java.sql.Date = {
val date = new SimpleDateFormat("dd/MM/yyyy")
.parse(stringDate)
return new java.sql.Date(date.getTime())
}

def printTable(table: Table) = {
println(table)
table.printSchema()
println(table.getSchema().getFieldNames().mkString(", "))
}

def printDataStream(dataStream: DataStream[_]) = {
println(dataStream)
println(dataStream.dataType)
}

test("dummy") {
val orderA: DataStream[Order] = this.env.fromCollection(
Seq(
new Order(1L, new ProductItem("beer", 10L), 3),
new Order(1L, new ProductItem("diaper", 11L), 4),
new Order(3L, new ProductItem("rubber", 12L), 2)
)
)

val orderB: DataStream[Order] = this.env.fromCollection(
Seq(
new Order(2L, new ProductItem("pen", 13L), 3),
new Order(2L, new ProductItem("rubber", 12L), 3),
new Order(4L, new ProductItem("beer", 10L), 1)
)
)

println(orderB)
println(orderB.dataType)

// convert DataStream to Table
val tableA =
this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
println(tableA)
tableA.printSchema()
println(tableA.getSchema().getFieldNames().mkString(", "))
// register DataStream as Table
this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, 'amount)

// union the two tables
val result = this.tEnv.sqlQuery(s"""
|SELECT * FROM $tableA WHERE amount > 2
|UNION ALL
|SELECT * FROM OrderB WHERE amount < 2
""".stripMargin)

val sink = new StringSink[Order]()
result.toAppendStream[Order].addSink(sink)

this.env.execute()

val expected = List(
"Order{user=1, product='Product{name='beer', id=10}', amount=3}",
"Order{user=1, product='Product{name='diaper', id=11}', amount=4}",
"Order{user=4, product='Product{name='beer', id=10}', amount=1}"
)
val results = sink.getResults.sorted
println("results")
println(results)
assert(expected.sorted === results)
}
}

/**
* There's a whole bunch of other test sinks to choose from there.
*/
object StreamTestSink {

val idCounter: AtomicInteger = new AtomicInteger(0)

val globalResults =
mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]]
val globalRetractResults =
mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]]
val globalUpsertResults =
mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, String]]]

def getNewSinkId: Int = {
val idx = idCounter.getAndIncrement()
this.synchronized {
globalResults.put(
idx,
mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
)
globalRetractResults.put(
idx,
mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
)
globalUpsertResults.put(
idx,
mutable.HashMap.empty[Int, mutable.Map[String, String]]
)
}
idx
}

def clear(): Unit = {
globalResults.clear()
globalRetractResults.clear()
globalUpsertResults.clear()
}
}

abstract class AbstractExactlyOnceSink[T]
extends RichSinkFunction[T]
with CheckpointedFunction {
protected var resultsState: ListState[String] = _
protected var localResults: mutable.ArrayBuffer[String] = _
protected val idx: Int = StreamTestSink.getNewSinkId

protected var globalResults: mutable.Map[Int, mutable.ArrayBuffer[String]] = _
protected var globalRetractResults
: mutable.Map[Int, mutable.ArrayBuffer[String]] = _
protected var globalUpsertResults
: mutable.Map[Int, mutable.Map[String, String]] = _

def isInitialized: Boolean = globalResults != null

override def initializeState(context: FunctionInitializationContext): Unit = {
resultsState = context.getOperatorStateStore
.getListState(
new ListStateDescriptor[String]("sink-results", Types.STRING)
)

localResults = mutable.ArrayBuffer.empty[String]

if (context.isRestored) {
for (value <- resultsState.get().asScala) {
localResults += value
}
}

val taskId = getRuntimeContext.getIndexOfThisSubtask
StreamTestSink.synchronized(
StreamTestSink.globalResults(idx) += (taskId -> localResults)
)
}

override def snapshotState(context: FunctionSnapshotContext): Unit = {
resultsState.clear()
for (value <- localResults) {
resultsState.add(value)
}
}

protected def clearAndStashGlobalResults(): Unit = {
if (globalResults == null) {
StreamTestSink.synchronized {
globalResults = StreamTestSink.globalResults.remove(idx).get
globalRetractResults =
StreamTestSink.globalRetractResults.remove(idx).get
globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get
}
}
}

protected def getResults: List[String] = {
clearAndStashGlobalResults()
val result = mutable.ArrayBuffer.empty[String]
this.globalResults.foreach {
case (_, list) => result ++= list
}
result.toList
}
}

final class StringSink[T] extends AbstractExactlyOnceSink[T]() {
override def invoke(value: T) {
localResults += value.toString
}

override def getResults: List[String] = super.getResults
}



On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <[hidden email]> wrote:
@Timo: Is this sth that would work when using the new type stack? From
the message I'm assuming it's using the older type stack.

@Rex: Which Flink version are you using and could you maybe post the
code snipped that you use to do conversions?

Best,
Aljoscha

On 02.11.20 06:50, Rex Fenley wrote:
> Maybe this is related to this issue?
> https://issues.apache.org/jira/browse/FLINK-17683
>
> On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <[hidden email]> wrote:
>
>> Correction, I'm using Scala case classes not strictly Java POJOs just to
>> be clear.
>>
>> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <[hidden email]> wrote:
>>
>>> Hello,
>>>
>>> I keep running into trouble moving between DataStream and SQL with POJOs
>>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there any
>>> way to convert them back to POJOs in Flink when converting a SQL Table back
>>> to a DataStream?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: LEGACY('STRUCTURED_TYPE' to pojo

Rex Fenley
My jobs normally use the blink planner, I noticed with this test that may not be the case.

On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <[hidden email]> wrote:
Flink 1.11.2 with Scala 2.12

Error:
[info] JobScalaTest:
[info] - dummy *** FAILED ***
[info]   org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink  do not match.
[info] Query schema: [user: BIGINT, product: ROW<`name` VARCHAR(2147483647), `id` BIGINT>, amount: INT]
[info] Sink schema: [user: BIGINT, product: LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem, rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'), amount: INT]
[info]   at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
[info]   at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
[info]   at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
[info]   at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
[info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
[info]   at scala.collection.Iterator.foreach$(Iterator.scala:943)
[info]   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
[info]   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
[info]   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
[info]   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

Code:
import com.remind.graph.people.PeopleJobScala

import org.scalatest.funsuite._
import org.scalatest.BeforeAndAfter

import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.streaming.util.TestStreamEnvironment
import org.apache.flink.table.runtime.util._
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.api.common.state.ListState
import org.apache.flink.runtime.state.FunctionInitializationContext
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.runtime.state.FunctionSnapshotContext
import org.apache.flink.types.Row

import java.io.Serializable;
import java.sql.Timestamp;
import java.text.SimpleDateFormat
import java.util.concurrent.atomic.AtomicInteger
import java.{util => ju}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try

case class Order(user: Long, product: ProductItem, amount: Int) {
def this() {
this(0, null, 0)
}

override def toString(): String = {
return "Order{" +
"user=" + user +
", product='" + product + '\'' +
", amount=" + amount +
'}';
}
}

case class ProductItem(name: String, id: Long) {
def this() {
this(null, 0)
}

override def toString(): String = {
return "Product{" +
"name='" + name + '\'' +
", id=" + id +
'}';
}
}

class JobScalaTest extends AnyFunSuite with BeforeAndAfter {
var env: StreamExecutionEnvironment = _
var tEnv: StreamTableEnvironment = _

before {
this.env = StreamExecutionEnvironment.getExecutionEnvironment
this.env.setParallelism(2)
this.env.getConfig.enableObjectReuse()
val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
this.tEnv = StreamTableEnvironment.create(env, setting)
}

after {
StreamTestSink.clear()
// TestValuesTableFactory.clearAllData()
}

def dateFrom(stringDate: String): java.sql.Date = {
val date = new SimpleDateFormat("dd/MM/yyyy")
.parse(stringDate)
return new java.sql.Date(date.getTime())
}

def printTable(table: Table) = {
println(table)
table.printSchema()
println(table.getSchema().getFieldNames().mkString(", "))
}

def printDataStream(dataStream: DataStream[_]) = {
println(dataStream)
println(dataStream.dataType)
}

test("dummy") {
val orderA: DataStream[Order] = this.env.fromCollection(
Seq(
new Order(1L, new ProductItem("beer", 10L), 3),
new Order(1L, new ProductItem("diaper", 11L), 4),
new Order(3L, new ProductItem("rubber", 12L), 2)
)
)

val orderB: DataStream[Order] = this.env.fromCollection(
Seq(
new Order(2L, new ProductItem("pen", 13L), 3),
new Order(2L, new ProductItem("rubber", 12L), 3),
new Order(4L, new ProductItem("beer", 10L), 1)
)
)

println(orderB)
println(orderB.dataType)

// convert DataStream to Table
val tableA =
this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
println(tableA)
tableA.printSchema()
println(tableA.getSchema().getFieldNames().mkString(", "))
// register DataStream as Table
this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, 'amount)

// union the two tables
val result = this.tEnv.sqlQuery(s"""
|SELECT * FROM $tableA WHERE amount > 2
|UNION ALL
|SELECT * FROM OrderB WHERE amount < 2
""".stripMargin)

val sink = new StringSink[Order]()
result.toAppendStream[Order].addSink(sink)

this.env.execute()

val expected = List(
"Order{user=1, product='Product{name='beer', id=10}', amount=3}",
"Order{user=1, product='Product{name='diaper', id=11}', amount=4}",
"Order{user=4, product='Product{name='beer', id=10}', amount=1}"
)
val results = sink.getResults.sorted
println("results")
println(results)
assert(expected.sorted === results)
}
}

/**
* There's a whole bunch of other test sinks to choose from there.
*/
object StreamTestSink {

val idCounter: AtomicInteger = new AtomicInteger(0)

val globalResults =
mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]]
val globalRetractResults =
mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]]
val globalUpsertResults =
mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, String]]]

def getNewSinkId: Int = {
val idx = idCounter.getAndIncrement()
this.synchronized {
globalResults.put(
idx,
mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
)
globalRetractResults.put(
idx,
mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
)
globalUpsertResults.put(
idx,
mutable.HashMap.empty[Int, mutable.Map[String, String]]
)
}
idx
}

def clear(): Unit = {
globalResults.clear()
globalRetractResults.clear()
globalUpsertResults.clear()
}
}

abstract class AbstractExactlyOnceSink[T]
extends RichSinkFunction[T]
with CheckpointedFunction {
protected var resultsState: ListState[String] = _
protected var localResults: mutable.ArrayBuffer[String] = _
protected val idx: Int = StreamTestSink.getNewSinkId

protected var globalResults: mutable.Map[Int, mutable.ArrayBuffer[String]] = _
protected var globalRetractResults
: mutable.Map[Int, mutable.ArrayBuffer[String]] = _
protected var globalUpsertResults
: mutable.Map[Int, mutable.Map[String, String]] = _

def isInitialized: Boolean = globalResults != null

override def initializeState(context: FunctionInitializationContext): Unit = {
resultsState = context.getOperatorStateStore
.getListState(
new ListStateDescriptor[String]("sink-results", Types.STRING)
)

localResults = mutable.ArrayBuffer.empty[String]

if (context.isRestored) {
for (value <- resultsState.get().asScala) {
localResults += value
}
}

val taskId = getRuntimeContext.getIndexOfThisSubtask
StreamTestSink.synchronized(
StreamTestSink.globalResults(idx) += (taskId -> localResults)
)
}

override def snapshotState(context: FunctionSnapshotContext): Unit = {
resultsState.clear()
for (value <- localResults) {
resultsState.add(value)
}
}

protected def clearAndStashGlobalResults(): Unit = {
if (globalResults == null) {
StreamTestSink.synchronized {
globalResults = StreamTestSink.globalResults.remove(idx).get
globalRetractResults =
StreamTestSink.globalRetractResults.remove(idx).get
globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get
}
}
}

protected def getResults: List[String] = {
clearAndStashGlobalResults()
val result = mutable.ArrayBuffer.empty[String]
this.globalResults.foreach {
case (_, list) => result ++= list
}
result.toList
}
}

final class StringSink[T] extends AbstractExactlyOnceSink[T]() {
override def invoke(value: T) {
localResults += value.toString
}

override def getResults: List[String] = super.getResults
}



On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <[hidden email]> wrote:
@Timo: Is this sth that would work when using the new type stack? From
the message I'm assuming it's using the older type stack.

@Rex: Which Flink version are you using and could you maybe post the
code snipped that you use to do conversions?

Best,
Aljoscha

On 02.11.20 06:50, Rex Fenley wrote:
> Maybe this is related to this issue?
> https://issues.apache.org/jira/browse/FLINK-17683
>
> On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <[hidden email]> wrote:
>
>> Correction, I'm using Scala case classes not strictly Java POJOs just to
>> be clear.
>>
>> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <[hidden email]> wrote:
>>
>>> Hello,
>>>
>>> I keep running into trouble moving between DataStream and SQL with POJOs
>>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there any
>>> way to convert them back to POJOs in Flink when converting a SQL Table back
>>> to a DataStream?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: LEGACY('STRUCTURED_TYPE' to pojo

Timo Walther
Hi Rex,

sorry for the late reply. POJOs will have much better support in the
upcoming Flink versions because they have been fully integrated with the
new table type system mentioned in FLIP-37 [1] (e.g. support for
immutable POJOs and nested DataTypeHints etc).

For queries, scalar, and table functions you can already use the full
POJOs within the table ecosystem.

However, the only missing piece is the new translation of POJOs from
Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until
then I would recommend to either use `Row` as the output of the table
API or try to use a scalar function before that maps to the desired data
structure.

I hope this helps a bit.

Regards,
Timo

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API

On 02.11.20 21:44, Rex Fenley wrote:

> My jobs normally use the blink planner, I noticed with this test that
> may not be the case.
>
> On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Flink 1.11.2 with Scala 2.12
>
>     Error:
>     [info] JobScalaTest:
>     [info] - dummy *** FAILED ***
>     [info]   org.apache.flink.table.api.ValidationException: Field types
>     of query result and registered TableSink  do not match.
>     [info] Query schema: [user: BIGINT, product: ROW<`name`
>     VARCHAR(2147483647), `id` BIGINT>, amount: INT]
>     [info] Sink schema: [user: BIGINT, product:
>     LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem,
>     rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
>     amount: INT]
>     [info]   at
>     org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
>     [info]   at
>     org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
>     [info]   at
>     org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
>     [info]   at
>     scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
>     [info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
>     [info]   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     [info]   at
>     scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     [info]   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     [info]   at
>     scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     [info]   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>
>     Code:
>     import com.remind.graph.people.PeopleJobScala
>
>     import org.scalatest.funsuite._
>     import org.scalatest.BeforeAndAfter
>
>     import org.apache.flink.streaming.api.scala.{
>     DataStream,
>     StreamExecutionEnvironment
>     }
>     import org.apache.flink.streaming.util.TestStreamEnvironment
>     import org.apache.flink.table.runtime.util._
>     import org.apache.flink.test.util.AbstractTestBase
>     import org.apache.flink.table.api._
>     import org.apache.flink.table.api.bridge.scala._
>     import org.apache.flink.streaming.api.scala._
>     import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
>     import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
>     import org.apache.flink.api.common.state.ListState
>     import org.apache.flink.runtime.state.FunctionInitializationContext
>     import org.apache.flink.api.common.state.ListStateDescriptor
>     import org.apache.flink.runtime.state.FunctionSnapshotContext
>     import org.apache.flink.types.Row
>
>     import java.io.Serializable;
>     import java.sql.Timestamp;
>     import java.text.SimpleDateFormat
>     import java.util.concurrent.atomic.AtomicInteger
>     import java.{util => ju}
>
>     import scala.collection.JavaConverters._
>     import scala.collection.mutable
>     import scala.util.Try
>
>     caseclassOrder(user: Long, product: ProductItem, amount: Int) {
>     defthis() {
>     this(0, null, 0)
>     }
>
>     overridedeftoString(): String = {
>     return"Order{"+
>     "user="+ user +
>     ", product='"+ product + '\''+
>     ", amount="+ amount +
>     '}';
>     }
>     }
>
>     caseclassProductItem(name: String, id: Long) {
>     defthis() {
>     this(null, 0)
>     }
>
>     overridedeftoString(): String = {
>     return"Product{"+
>     "name='"+ name + '\''+
>     ", id="+ id +
>     '}';
>     }
>     }
>
>     classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{
>     varenv: StreamExecutionEnvironment = _
>     vartEnv: StreamTableEnvironment = _
>
>     before {
>     this.env = StreamExecutionEnvironment.getExecutionEnvironment
>     this.env.setParallelism(2)
>     this.env.getConfig.enableObjectReuse()
>     valsetting = EnvironmentSettings.newInstance().inStreamingMode().build()
>     this.tEnv = StreamTableEnvironment.create(env, setting)
>     }
>
>     after {
>     StreamTestSink.clear()
>     // TestValuesTableFactory.clearAllData()
>     }
>
>     defdateFrom(stringDate: String): java.sql.Date = {
>     valdate = newSimpleDateFormat("dd/MM/yyyy")
>     .parse(stringDate)
>     returnnewjava.sql.Date(date.getTime())
>     }
>
>     defprintTable(table: Table) = {
>     println(table)
>     table.printSchema()
>     println(table.getSchema().getFieldNames().mkString(", "))
>     }
>
>     defprintDataStream(dataStream: DataStream[_]) = {
>     println(dataStream)
>     println(dataStream.dataType)
>     }
>
>     test("dummy") {
>     valorderA: DataStream[Order] = this.env.fromCollection(
>     Seq(
>     newOrder(1L, newProductItem("beer", 10L), 3),
>     newOrder(1L, newProductItem("diaper", 11L), 4),
>     newOrder(3L, newProductItem("rubber", 12L), 2)
>     )
>     )
>
>     valorderB: DataStream[Order] = this.env.fromCollection(
>     Seq(
>     newOrder(2L, newProductItem("pen", 13L), 3),
>     newOrder(2L, newProductItem("rubber", 12L), 3),
>     newOrder(4L, newProductItem("beer", 10L), 1)
>     )
>     )
>
>     println(orderB)
>     println(orderB.dataType)
>
>     // convert DataStream to Table
>     valtableA =
>     this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
>     println(tableA)
>     tableA.printSchema()
>     println(tableA.getSchema().getFieldNames().mkString(", "))
>     // register DataStream as Table
>     this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product,
>     'amount)
>
>     // union the two tables
>     valresult = this.tEnv.sqlQuery(s"""
>     |SELECT * FROM $tableAWHERE amount > 2
>     |UNION ALL
>     |SELECT * FROM OrderB WHERE amount < 2
>     """.stripMargin)
>
>     valsink = newStringSink[Order]()
>     result.toAppendStream[Order].addSink(sink)
>
>     this.env.execute()
>
>     valexpected = List(
>     "Order{user=1, product='Product{name='beer', id=10}', amount=3}",
>     "Order{user=1, product='Product{name='diaper', id=11}', amount=4}",
>     "Order{user=4, product='Product{name='beer', id=10}', amount=1}"
>     )
>     valresults = sink.getResults.sorted
>     println("results")
>     println(results)
>     assert(expected.sorted === results)
>     }
>     }
>
>     /**
>     * Taken from:
>     https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
>     * There's a whole bunch of other test sinks to choose from there.
>     */
>     objectStreamTestSink {
>
>     validCounter: AtomicInteger = newAtomicInteger(0)
>
>     valglobalResults =
>     mutable.HashMap.empty[Int, mutable.Map[Int,
>     mutable.ArrayBuffer[String]]]
>     valglobalRetractResults =
>     mutable.HashMap.empty[Int, mutable.Map[Int,
>     mutable.ArrayBuffer[String]]]
>     valglobalUpsertResults =
>     mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String,
>     String]]]
>
>     defgetNewSinkId: Int = {
>     validx = idCounter.getAndIncrement()
>     this.synchronized{
>     globalResults.put(
>     idx,
>     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
>     )
>     globalRetractResults.put(
>     idx,
>     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
>     )
>     globalUpsertResults.put(
>     idx,
>     mutable.HashMap.empty[Int, mutable.Map[String, String]]
>     )
>     }
>     idx
>     }
>
>     defclear(): Unit = {
>     globalResults.clear()
>     globalRetractResults.clear()
>     globalUpsertResults.clear()
>     }
>     }
>
>     abstractclassAbstractExactlyOnceSink[T]
>     extendsRichSinkFunction[T]
>     withCheckpointedFunction{
>     protectedvarresultsState: ListState[String] = _
>     protectedvarlocalResults: mutable.ArrayBuffer[String] = _
>     protectedvalidx: Int = StreamTestSink.getNewSinkId
>
>     protectedvarglobalResults: mutable.Map[Int,
>     mutable.ArrayBuffer[String]] = _
>     protectedvarglobalRetractResults
>     : mutable.Map[Int, mutable.ArrayBuffer[String]] = _
>     protectedvarglobalUpsertResults
>     : mutable.Map[Int, mutable.Map[String, String]] = _
>
>     defisInitialized: Boolean = globalResults != null
>
>     overridedefinitializeState(context: FunctionInitializationContext):
>     Unit = {
>     resultsState = context.getOperatorStateStore
>     .getListState(
>     newListStateDescriptor[String]("sink-results", Types.STRING)
>     )
>
>     localResults = mutable.ArrayBuffer.empty[String]
>
>     if(context.isRestored) {
>     for(value <- resultsState.get().asScala) {
>     localResults += value
>     }
>     }
>
>     valtaskId = getRuntimeContext.getIndexOfThisSubtask
>     StreamTestSink.synchronized(
>     StreamTestSink.globalResults(idx) += (taskId -> localResults)
>     )
>     }
>
>     overridedefsnapshotState(context: FunctionSnapshotContext): Unit = {
>     resultsState.clear()
>     for(value <- localResults) {
>     resultsState.add(value)
>     }
>     }
>
>     protecteddefclearAndStashGlobalResults(): Unit = {
>     if(globalResults == null) {
>     StreamTestSink.synchronized{
>     globalResults = StreamTestSink.globalResults.remove(idx).get
>     globalRetractResults =
>     StreamTestSink.globalRetractResults.remove(idx).get
>     globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get
>     }
>     }
>     }
>
>     protecteddefgetResults: List[String] = {
>     clearAndStashGlobalResults()
>     valresult = mutable.ArrayBuffer.empty[String]
>     this.globalResults.foreach {
>     case(_, list) => result ++= list
>     }
>     result.toList
>     }
>     }
>
>     finalclassStringSink[T] extendsAbstractExactlyOnceSink[T]() {
>     overridedefinvoke(value: T) {
>     localResults += value.toString
>     }
>
>     overridedefgetResults: List[String] = super.getResults
>     }
>
>
>
>     On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         @Timo: Is this sth that would work when using the new type
>         stack? From
>         the message I'm assuming it's using the older type stack.
>
>         @Rex: Which Flink version are you using and could you maybe post
>         the
>         code snipped that you use to do conversions?
>
>         Best,
>         Aljoscha
>
>         On 02.11.20 06:50, Rex Fenley wrote:
>          > Maybe this is related to this issue?
>          > https://issues.apache.org/jira/browse/FLINK-17683
>          >
>          > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <[hidden email]
>         <mailto:[hidden email]>> wrote:
>          >
>          >> Correction, I'm using Scala case classes not strictly Java
>         POJOs just to
>          >> be clear.
>          >>
>          >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley
>         <[hidden email] <mailto:[hidden email]>> wrote:
>          >>
>          >>> Hello,
>          >>>
>          >>> I keep running into trouble moving between DataStream and
>         SQL with POJOs
>          >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE',
>         is there any
>          >>> way to convert them back to POJOs in Flink when converting
>         a SQL Table back
>          >>> to a DataStream?
>          >>>
>          >>> Thanks!
>          >>>
>          >>> --
>          >>>
>          >>> Rex Fenley  |  Software Engineer - Mobile and Backend
>          >>>
>          >>>
>          >>> Remind.com <https://www.remind.com/> |  BLOG
>         <http://blog.remind.com/>
>          >>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>          >>> <https://www.facebook.com/remindhq>
>          >>>
>          >>
>          >>
>          >> --
>          >>
>          >> Rex Fenley  |  Software Engineer - Mobile and Backend
>          >>
>          >>
>          >> Remind.com <https://www.remind.com/> |  BLOG
>         <http://blog.remind.com/>  |
>          >>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>          >> <https://www.facebook.com/remindhq>
>          >>
>          >
>          >
>
>
>
>     --
>
>     Rex Fenley|Software Engineer - Mobile and Backend
>
>
>     Remind.com <https://www.remind.com/>| BLOG
>     <http://blog.remind.com/> | FOLLOW US
>     <https://twitter.com/remindhq> | LIKE US
>     <https://www.facebook.com/remindhq>
>
>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>

Reply | Threaded
Open this post in threaded view
|

Re: LEGACY('STRUCTURED_TYPE' to pojo

Rex Fenley
Thank you for the info!

Is there a timetable for when the next version with this change might release?

On Wed, Nov 4, 2020 at 2:44 AM Timo Walther <[hidden email]> wrote:
Hi Rex,

sorry for the late reply. POJOs will have much better support in the
upcoming Flink versions because they have been fully integrated with the
new table type system mentioned in FLIP-37 [1] (e.g. support for
immutable POJOs and nested DataTypeHints etc).

For queries, scalar, and table functions you can already use the full
POJOs within the table ecosystem.

However, the only missing piece is the new translation of POJOs from
Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until
then I would recommend to either use `Row` as the output of the table
API or try to use a scalar function before that maps to the desired data
structure.

I hope this helps a bit.

Regards,
Timo

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API

On 02.11.20 21:44, Rex Fenley wrote:
> My jobs normally use the blink planner, I noticed with this test that
> may not be the case.
>
> On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Flink 1.11.2 with Scala 2.12
>
>     Error:
>     [info] JobScalaTest:
>     [info] - dummy *** FAILED ***
>     [info]   org.apache.flink.table.api.ValidationException: Field types
>     of query result and registered TableSink  do not match.
>     [info] Query schema: [user: BIGINT, product: ROW<`name`
>     VARCHAR(2147483647), `id` BIGINT>, amount: INT]
>     [info] Sink schema: [user: BIGINT, product:
>     LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem,
>     rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
>     amount: INT]
>     [info]   at
>     org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
>     [info]   at
>     org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
>     [info]   at
>     org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
>     [info]   at
>     scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
>     [info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
>     [info]   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     [info]   at
>     scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     [info]   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     [info]   at
>     scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     [info]   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>
>     Code:
>     import com.remind.graph.people.PeopleJobScala
>
>     import org.scalatest.funsuite._
>     import org.scalatest.BeforeAndAfter
>
>     import org.apache.flink.streaming.api.scala.{
>     DataStream,
>     StreamExecutionEnvironment
>     }
>     import org.apache.flink.streaming.util.TestStreamEnvironment
>     import org.apache.flink.table.runtime.util._
>     import org.apache.flink.test.util.AbstractTestBase
>     import org.apache.flink.table.api._
>     import org.apache.flink.table.api.bridge.scala._
>     import org.apache.flink.streaming.api.scala._
>     import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
>     import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
>     import org.apache.flink.api.common.state.ListState
>     import org.apache.flink.runtime.state.FunctionInitializationContext
>     import org.apache.flink.api.common.state.ListStateDescriptor
>     import org.apache.flink.runtime.state.FunctionSnapshotContext
>     import org.apache.flink.types.Row
>
>     import java.io.Serializable;
>     import java.sql.Timestamp;
>     import java.text.SimpleDateFormat
>     import java.util.concurrent.atomic.AtomicInteger
>     import java.{util => ju}
>
>     import scala.collection.JavaConverters._
>     import scala.collection.mutable
>     import scala.util.Try
>
>     caseclassOrder(user: Long, product: ProductItem, amount: Int) {
>     defthis() {
>     this(0, null, 0)
>     }
>
>     overridedeftoString(): String = {
>     return"Order{"+
>     "user="+ user +
>     ", product='"+ product + '\''+
>     ", amount="+ amount +
>     '}';
>     }
>     }
>
>     caseclassProductItem(name: String, id: Long) {
>     defthis() {
>     this(null, 0)
>     }
>
>     overridedeftoString(): String = {
>     return"Product{"+
>     "name='"+ name + '\''+
>     ", id="+ id +
>     '}';
>     }
>     }
>
>     classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{
>     varenv: StreamExecutionEnvironment = _
>     vartEnv: StreamTableEnvironment = _
>
>     before {
>     this.env = StreamExecutionEnvironment.getExecutionEnvironment
>     this.env.setParallelism(2)
>     this.env.getConfig.enableObjectReuse()
>     valsetting = EnvironmentSettings.newInstance().inStreamingMode().build()
>     this.tEnv = StreamTableEnvironment.create(env, setting)
>     }
>
>     after {
>     StreamTestSink.clear()
>     // TestValuesTableFactory.clearAllData()
>     }
>
>     defdateFrom(stringDate: String): java.sql.Date = {
>     valdate = newSimpleDateFormat("dd/MM/yyyy")
>     .parse(stringDate)
>     returnnewjava.sql.Date(date.getTime())
>     }
>
>     defprintTable(table: Table) = {
>     println(table)
>     table.printSchema()
>     println(table.getSchema().getFieldNames().mkString(", "))
>     }
>
>     defprintDataStream(dataStream: DataStream[_]) = {
>     println(dataStream)
>     println(dataStream.dataType)
>     }
>
>     test("dummy") {
>     valorderA: DataStream[Order] = this.env.fromCollection(
>     Seq(
>     newOrder(1L, newProductItem("beer", 10L), 3),
>     newOrder(1L, newProductItem("diaper", 11L), 4),
>     newOrder(3L, newProductItem("rubber", 12L), 2)
>     )
>     )
>
>     valorderB: DataStream[Order] = this.env.fromCollection(
>     Seq(
>     newOrder(2L, newProductItem("pen", 13L), 3),
>     newOrder(2L, newProductItem("rubber", 12L), 3),
>     newOrder(4L, newProductItem("beer", 10L), 1)
>     )
>     )
>
>     println(orderB)
>     println(orderB.dataType)
>
>     // convert DataStream to Table
>     valtableA =
>     this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
>     println(tableA)
>     tableA.printSchema()
>     println(tableA.getSchema().getFieldNames().mkString(", "))
>     // register DataStream as Table
>     this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product,
>     'amount)
>
>     // union the two tables
>     valresult = this.tEnv.sqlQuery(s"""
>     |SELECT * FROM $tableAWHERE amount > 2
>     |UNION ALL
>     |SELECT * FROM OrderB WHERE amount < 2
>     """.stripMargin)
>
>     valsink = newStringSink[Order]()
>     result.toAppendStream[Order].addSink(sink)
>
>     this.env.execute()
>
>     valexpected = List(
>     "Order{user=1, product='Product{name='beer', id=10}', amount=3}",
>     "Order{user=1, product='Product{name='diaper', id=11}', amount=4}",
>     "Order{user=4, product='Product{name='beer', id=10}', amount=1}"
>     )
>     valresults = sink.getResults.sorted
>     println("results")
>     println(results)
>     assert(expected.sorted === results)
>     }
>     }
>
>     /**
>     * Taken from:
>     https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
>     * There's a whole bunch of other test sinks to choose from there.
>     */
>     objectStreamTestSink {
>
>     validCounter: AtomicInteger = newAtomicInteger(0)
>
>     valglobalResults =
>     mutable.HashMap.empty[Int, mutable.Map[Int,
>     mutable.ArrayBuffer[String]]]
>     valglobalRetractResults =
>     mutable.HashMap.empty[Int, mutable.Map[Int,
>     mutable.ArrayBuffer[String]]]
>     valglobalUpsertResults =
>     mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String,
>     String]]]
>
>     defgetNewSinkId: Int = {
>     validx = idCounter.getAndIncrement()
>     this.synchronized{
>     globalResults.put(
>     idx,
>     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
>     )
>     globalRetractResults.put(
>     idx,
>     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
>     )
>     globalUpsertResults.put(
>     idx,
>     mutable.HashMap.empty[Int, mutable.Map[String, String]]
>     )
>     }
>     idx
>     }
>
>     defclear(): Unit = {
>     globalResults.clear()
>     globalRetractResults.clear()
>     globalUpsertResults.clear()
>     }
>     }
>
>     abstractclassAbstractExactlyOnceSink[T]
>     extendsRichSinkFunction[T]
>     withCheckpointedFunction{
>     protectedvarresultsState: ListState[String] = _
>     protectedvarlocalResults: mutable.ArrayBuffer[String] = _
>     protectedvalidx: Int = StreamTestSink.getNewSinkId
>
>     protectedvarglobalResults: mutable.Map[Int,
>     mutable.ArrayBuffer[String]] = _
>     protectedvarglobalRetractResults
>     : mutable.Map[Int, mutable.ArrayBuffer[String]] = _
>     protectedvarglobalUpsertResults
>     : mutable.Map[Int, mutable.Map[String, String]] = _
>
>     defisInitialized: Boolean = globalResults != null
>
>     overridedefinitializeState(context: FunctionInitializationContext):
>     Unit = {
>     resultsState = context.getOperatorStateStore
>     .getListState(
>     newListStateDescriptor[String]("sink-results", Types.STRING)
>     )
>
>     localResults = mutable.ArrayBuffer.empty[String]
>
>     if(context.isRestored) {
>     for(value <- resultsState.get().asScala) {
>     localResults += value
>     }
>     }
>
>     valtaskId = getRuntimeContext.getIndexOfThisSubtask
>     StreamTestSink.synchronized(
>     StreamTestSink.globalResults(idx) += (taskId -> localResults)
>     )
>     }
>
>     overridedefsnapshotState(context: FunctionSnapshotContext): Unit = {
>     resultsState.clear()
>     for(value <- localResults) {
>     resultsState.add(value)
>     }
>     }
>
>     protecteddefclearAndStashGlobalResults(): Unit = {
>     if(globalResults == null) {
>     StreamTestSink.synchronized{
>     globalResults = StreamTestSink.globalResults.remove(idx).get
>     globalRetractResults =
>     StreamTestSink.globalRetractResults.remove(idx).get
>     globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get
>     }
>     }
>     }
>
>     protecteddefgetResults: List[String] = {
>     clearAndStashGlobalResults()
>     valresult = mutable.ArrayBuffer.empty[String]
>     this.globalResults.foreach {
>     case(_, list) => result ++= list
>     }
>     result.toList
>     }
>     }
>
>     finalclassStringSink[T] extendsAbstractExactlyOnceSink[T]() {
>     overridedefinvoke(value: T) {
>     localResults += value.toString
>     }
>
>     overridedefgetResults: List[String] = super.getResults
>     }
>
>
>
>     On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         @Timo: Is this sth that would work when using the new type
>         stack? From
>         the message I'm assuming it's using the older type stack.
>
>         @Rex: Which Flink version are you using and could you maybe post
>         the
>         code snipped that you use to do conversions?
>
>         Best,
>         Aljoscha
>
>         On 02.11.20 06:50, Rex Fenley wrote:
>          > Maybe this is related to this issue?
>          > https://issues.apache.org/jira/browse/FLINK-17683
>          >
>          > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <[hidden email]
>         <mailto:[hidden email]>> wrote:
>          >
>          >> Correction, I'm using Scala case classes not strictly Java
>         POJOs just to
>          >> be clear.
>          >>
>          >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley
>         <[hidden email] <mailto:[hidden email]>> wrote:
>          >>
>          >>> Hello,
>          >>>
>          >>> I keep running into trouble moving between DataStream and
>         SQL with POJOs
>          >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE',
>         is there any
>          >>> way to convert them back to POJOs in Flink when converting
>         a SQL Table back
>          >>> to a DataStream?
>          >>>
>          >>> Thanks!
>          >>>
>          >>> --
>          >>>
>          >>> Rex Fenley  |  Software Engineer - Mobile and Backend
>          >>>
>          >>>
>          >>> Remind.com <https://www.remind.com/> |  BLOG
>         <http://blog.remind.com/>
>          >>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>          >>> <https://www.facebook.com/remindhq>
>          >>>
>          >>
>          >>
>          >> --
>          >>
>          >> Rex Fenley  |  Software Engineer - Mobile and Backend
>          >>
>          >>
>          >> Remind.com <https://www.remind.com/> |  BLOG
>         <http://blog.remind.com/>  |
>          >>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>          >> <https://www.facebook.com/remindhq>
>          >>
>          >
>          >
>
>
>
>     --
>
>     Rex Fenley|Software Engineer - Mobile and Backend
>
>
>     Remind.com <https://www.remind.com/>| BLOG
>     <http://blog.remind.com/> | FOLLOW US
>     <https://twitter.com/remindhq> | LIKE US
>     <https://www.facebook.com/remindhq>
>
>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: LEGACY('STRUCTURED_TYPE' to pojo

Timo Walther
It was planned for 1.12 but didn't make it. 1.13 should fully implement
FLIP-136. I just created issues to monitor the progress:

https://issues.apache.org/jira/browse/FLINK-19976

Regards,
Timo

On 04.11.20 18:43, Rex Fenley wrote:

> Thank you for the info!
>
> Is there a timetable for when the next version with this change might
> release?
>
> On Wed, Nov 4, 2020 at 2:44 AM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Hi Rex,
>
>     sorry for the late reply. POJOs will have much better support in the
>     upcoming Flink versions because they have been fully integrated with
>     the
>     new table type system mentioned in FLIP-37 [1] (e.g. support for
>     immutable POJOs and nested DataTypeHints etc).
>
>     For queries, scalar, and table functions you can already use the full
>     POJOs within the table ecosystem.
>
>     However, the only missing piece is the new translation of POJOs from
>     Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until
>     then I would recommend to either use `Row` as the output of the table
>     API or try to use a scalar function before that maps to the desired
>     data
>     structure.
>
>     I hope this helps a bit.
>
>     Regards,
>     Timo
>
>     [1]
>     https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
>     [2]
>     https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>
>     On 02.11.20 21:44, Rex Fenley wrote:
>      > My jobs normally use the blink planner, I noticed with this test
>     that
>      > may not be the case.
>      >
>      > On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <[hidden email]
>     <mailto:[hidden email]>
>      > <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >     Flink 1.11.2 with Scala 2.12
>      >
>      >     Error:
>      >     [info] JobScalaTest:
>      >     [info] - dummy *** FAILED ***
>      >     [info]   org.apache.flink.table.api.ValidationException:
>     Field types
>      >     of query result and registered TableSink  do not match.
>      >     [info] Query schema: [user: BIGINT, product: ROW<`name`
>      >     VARCHAR(2147483647), `id` BIGINT>, amount: INT]
>      >     [info] Sink schema: [user: BIGINT, product:
>      >     LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem,
>      >  
>       rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
>      >     amount: INT]
>      >     [info]   at
>      >  
>       org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
>      >     [info]   at
>      >  
>       org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
>      >     [info]   at
>      >  
>       org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
>      >     [info]   at
>      >  
>       scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
>      >     [info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
>      >     [info]   at
>     scala.collection.Iterator.foreach$(Iterator.scala:943)
>      >     [info]   at
>      >     scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>      >     [info]   at
>     scala.collection.IterableLike.foreach(IterableLike.scala:74)
>      >     [info]   at
>      >     scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>      >     [info]   at
>     scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>      >
>      >     Code:
>      >     import com.remind.graph.people.PeopleJobScala
>      >
>      >     import org.scalatest.funsuite._
>      >     import org.scalatest.BeforeAndAfter
>      >
>      >     import org.apache.flink.streaming.api.scala.{
>      >     DataStream,
>      >     StreamExecutionEnvironment
>      >     }
>      >     import org.apache.flink.streaming.util.TestStreamEnvironment
>      >     import org.apache.flink.table.runtime.util._
>      >     import org.apache.flink.test.util.AbstractTestBase
>      >     import org.apache.flink.table.api._
>      >     import org.apache.flink.table.api.bridge.scala._
>      >     import org.apache.flink.streaming.api.scala._
>      >     import
>     org.apache.flink.streaming.api.functions.sink.RichSinkFunction
>      >     import
>     org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
>      >     import org.apache.flink.api.common.state.ListState
>      >     import
>     org.apache.flink.runtime.state.FunctionInitializationContext
>      >     import org.apache.flink.api.common.state.ListStateDescriptor
>      >     import org.apache.flink.runtime.state.FunctionSnapshotContext
>      >     import org.apache.flink.types.Row
>      >
>      >     import java.io.Serializable;
>      >     import java.sql.Timestamp;
>      >     import java.text.SimpleDateFormat
>      >     import java.util.concurrent.atomic.AtomicInteger
>      >     import java.{util => ju}
>      >
>      >     import scala.collection.JavaConverters._
>      >     import scala.collection.mutable
>      >     import scala.util.Try
>      >
>      >     caseclassOrder(user: Long, product: ProductItem, amount: Int) {
>      >     defthis() {
>      >     this(0, null, 0)
>      >     }
>      >
>      >     overridedeftoString(): String = {
>      >     return"Order{"+
>      >     "user="+ user +
>      >     ", product='"+ product + '\''+
>      >     ", amount="+ amount +
>      >     '}';
>      >     }
>      >     }
>      >
>      >     caseclassProductItem(name: String, id: Long) {
>      >     defthis() {
>      >     this(null, 0)
>      >     }
>      >
>      >     overridedeftoString(): String = {
>      >     return"Product{"+
>      >     "name='"+ name + '\''+
>      >     ", id="+ id +
>      >     '}';
>      >     }
>      >     }
>      >
>      >     classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{
>      >     varenv: StreamExecutionEnvironment = _
>      >     vartEnv: StreamTableEnvironment = _
>      >
>      >     before {
>      >     this.env = StreamExecutionEnvironment.getExecutionEnvironment
>      >     this.env.setParallelism(2)
>      >     this.env.getConfig.enableObjectReuse()
>      >     valsetting =
>     EnvironmentSettings.newInstance().inStreamingMode().build()
>      >     this.tEnv = StreamTableEnvironment.create(env, setting)
>      >     }
>      >
>      >     after {
>      >     StreamTestSink.clear()
>      >     // TestValuesTableFactory.clearAllData()
>      >     }
>      >
>      >     defdateFrom(stringDate: String): java.sql.Date = {
>      >     valdate = newSimpleDateFormat("dd/MM/yyyy")
>      >     .parse(stringDate)
>      >     returnnewjava.sql.Date(date.getTime())
>      >     }
>      >
>      >     defprintTable(table: Table) = {
>      >     println(table)
>      >     table.printSchema()
>      >     println(table.getSchema().getFieldNames().mkString(", "))
>      >     }
>      >
>      >     defprintDataStream(dataStream: DataStream[_]) = {
>      >     println(dataStream)
>      >     println(dataStream.dataType)
>      >     }
>      >
>      >     test("dummy") {
>      >     valorderA: DataStream[Order] = this.env.fromCollection(
>      >     Seq(
>      >     newOrder(1L, newProductItem("beer", 10L), 3),
>      >     newOrder(1L, newProductItem("diaper", 11L), 4),
>      >     newOrder(3L, newProductItem("rubber", 12L), 2)
>      >     )
>      >     )
>      >
>      >     valorderB: DataStream[Order] = this.env.fromCollection(
>      >     Seq(
>      >     newOrder(2L, newProductItem("pen", 13L), 3),
>      >     newOrder(2L, newProductItem("rubber", 12L), 3),
>      >     newOrder(4L, newProductItem("beer", 10L), 1)
>      >     )
>      >     )
>      >
>      >     println(orderB)
>      >     println(orderB.dataType)
>      >
>      >     // convert DataStream to Table
>      >     valtableA =
>      >     this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
>      >     println(tableA)
>      >     tableA.printSchema()
>      >     println(tableA.getSchema().getFieldNames().mkString(", "))
>      >     // register DataStream as Table
>      >     this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product,
>      >     'amount)
>      >
>      >     // union the two tables
>      >     valresult = this.tEnv.sqlQuery(s"""
>      >     |SELECT * FROM $tableAWHERE amount > 2
>      >     |UNION ALL
>      >     |SELECT * FROM OrderB WHERE amount < 2
>      >     """.stripMargin)
>      >
>      >     valsink = newStringSink[Order]()
>      >     result.toAppendStream[Order].addSink(sink)
>      >
>      >     this.env.execute()
>      >
>      >     valexpected = List(
>      >     "Order{user=1, product='Product{name='beer', id=10}', amount=3}",
>      >     "Order{user=1, product='Product{name='diaper', id=11}',
>     amount=4}",
>      >     "Order{user=4, product='Product{name='beer', id=10}', amount=1}"
>      >     )
>      >     valresults = sink.getResults.sorted
>      >     println("results")
>      >     println(results)
>      >     assert(expected.sorted === results)
>      >     }
>      >     }
>      >
>      >     /**
>      >     * Taken from:
>      >
>     https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
>      >     * There's a whole bunch of other test sinks to choose from there.
>      >     */
>      >     objectStreamTestSink {
>      >
>      >     validCounter: AtomicInteger = newAtomicInteger(0)
>      >
>      >     valglobalResults =
>      >     mutable.HashMap.empty[Int, mutable.Map[Int,
>      >     mutable.ArrayBuffer[String]]]
>      >     valglobalRetractResults =
>      >     mutable.HashMap.empty[Int, mutable.Map[Int,
>      >     mutable.ArrayBuffer[String]]]
>      >     valglobalUpsertResults =
>      >     mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String,
>      >     String]]]
>      >
>      >     defgetNewSinkId: Int = {
>      >     validx = idCounter.getAndIncrement()
>      >     this.synchronized{
>      >     globalResults.put(
>      >     idx,
>      >     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
>      >     )
>      >     globalRetractResults.put(
>      >     idx,
>      >     mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
>      >     )
>      >     globalUpsertResults.put(
>      >     idx,
>      >     mutable.HashMap.empty[Int, mutable.Map[String, String]]
>      >     )
>      >     }
>      >     idx
>      >     }
>      >
>      >     defclear(): Unit = {
>      >     globalResults.clear()
>      >     globalRetractResults.clear()
>      >     globalUpsertResults.clear()
>      >     }
>      >     }
>      >
>      >     abstractclassAbstractExactlyOnceSink[T]
>      >     extendsRichSinkFunction[T]
>      >     withCheckpointedFunction{
>      >     protectedvarresultsState: ListState[String] = _
>      >     protectedvarlocalResults: mutable.ArrayBuffer[String] = _
>      >     protectedvalidx: Int = StreamTestSink.getNewSinkId
>      >
>      >     protectedvarglobalResults: mutable.Map[Int,
>      >     mutable.ArrayBuffer[String]] = _
>      >     protectedvarglobalRetractResults
>      >     : mutable.Map[Int, mutable.ArrayBuffer[String]] = _
>      >     protectedvarglobalUpsertResults
>      >     : mutable.Map[Int, mutable.Map[String, String]] = _
>      >
>      >     defisInitialized: Boolean = globalResults != null
>      >
>      >     overridedefinitializeState(context:
>     FunctionInitializationContext):
>      >     Unit = {
>      >     resultsState = context.getOperatorStateStore
>      >     .getListState(
>      >     newListStateDescriptor[String]("sink-results", Types.STRING)
>      >     )
>      >
>      >     localResults = mutable.ArrayBuffer.empty[String]
>      >
>      >     if(context.isRestored) {
>      >     for(value <- resultsState.get().asScala) {
>      >     localResults += value
>      >     }
>      >     }
>      >
>      >     valtaskId = getRuntimeContext.getIndexOfThisSubtask
>      >     StreamTestSink.synchronized(
>      >     StreamTestSink.globalResults(idx) += (taskId -> localResults)
>      >     )
>      >     }
>      >
>      >     overridedefsnapshotState(context: FunctionSnapshotContext):
>     Unit = {
>      >     resultsState.clear()
>      >     for(value <- localResults) {
>      >     resultsState.add(value)
>      >     }
>      >     }
>      >
>      >     protecteddefclearAndStashGlobalResults(): Unit = {
>      >     if(globalResults == null) {
>      >     StreamTestSink.synchronized{
>      >     globalResults = StreamTestSink.globalResults.remove(idx).get
>      >     globalRetractResults =
>      >     StreamTestSink.globalRetractResults.remove(idx).get
>      >     globalUpsertResults =
>     StreamTestSink.globalUpsertResults.remove(idx).get
>      >     }
>      >     }
>      >     }
>      >
>      >     protecteddefgetResults: List[String] = {
>      >     clearAndStashGlobalResults()
>      >     valresult = mutable.ArrayBuffer.empty[String]
>      >     this.globalResults.foreach {
>      >     case(_, list) => result ++= list
>      >     }
>      >     result.toList
>      >     }
>      >     }
>      >
>      >     finalclassStringSink[T] extendsAbstractExactlyOnceSink[T]() {
>      >     overridedefinvoke(value: T) {
>      >     localResults += value.toString
>      >     }
>      >
>      >     overridedefgetResults: List[String] = super.getResults
>      >     }
>      >
>      >
>      >
>      >     On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek
>     <[hidden email] <mailto:[hidden email]>
>      >     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >
>      >         @Timo: Is this sth that would work when using the new type
>      >         stack? From
>      >         the message I'm assuming it's using the older type stack.
>      >
>      >         @Rex: Which Flink version are you using and could you
>     maybe post
>      >         the
>      >         code snipped that you use to do conversions?
>      >
>      >         Best,
>      >         Aljoscha
>      >
>      >         On 02.11.20 06:50, Rex Fenley wrote:
>      >          > Maybe this is related to this issue?
>      >          > https://issues.apache.org/jira/browse/FLINK-17683
>      >          >
>      >          > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley
>     <[hidden email] <mailto:[hidden email]>
>      >         <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >          >
>      >          >> Correction, I'm using Scala case classes not strictly
>     Java
>      >         POJOs just to
>      >          >> be clear.
>      >          >>
>      >          >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley
>      >         <[hidden email] <mailto:[hidden email]>
>     <mailto:[hidden email] <mailto:[hidden email]>>> wrote:
>      >          >>
>      >          >>> Hello,
>      >          >>>
>      >          >>> I keep running into trouble moving between
>     DataStream and
>      >         SQL with POJOs
>      >          >>> because my nested POJOs turn into
>     LEGACY('STRUCTURED_TYPE',
>      >         is there any
>      >          >>> way to convert them back to POJOs in Flink when
>     converting
>      >         a SQL Table back
>      >          >>> to a DataStream?
>      >          >>>
>      >          >>> Thanks!
>      >          >>>
>      >          >>> --
>      >          >>>
>      >          >>> Rex Fenley  |  Software Engineer - Mobile and Backend
>      >          >>>
>      >          >>>
>      >          >>> Remind.com <https://www.remind.com/> |  BLOG
>      >         <http://blog.remind.com/>
>      >          >>>   |  FOLLOW US <https://twitter.com/remindhq>  |
>     LIKE US
>      >          >>> <https://www.facebook.com/remindhq>
>      >          >>>
>      >          >>
>      >          >>
>      >          >> --
>      >          >>
>      >          >> Rex Fenley  |  Software Engineer - Mobile and Backend
>      >          >>
>      >          >>
>      >          >> Remind.com <https://www.remind.com/> |  BLOG
>      >         <http://blog.remind.com/>  |
>      >          >>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>      >          >> <https://www.facebook.com/remindhq>
>      >          >>
>      >          >
>      >          >
>      >
>      >
>      >
>      >     --
>      >
>      >     Rex Fenley|Software Engineer - Mobile and Backend
>      >
>      >
>      >     Remind.com <https://www.remind.com/>| BLOG
>      >     <http://blog.remind.com/> | FOLLOW US
>      >     <https://twitter.com/remindhq> | LIKE US
>      >     <https://www.facebook.com/remindhq>
>      >
>      >
>      >
>      > --
>      >
>      > Rex Fenley|Software Engineer - Mobile and Backend
>      >
>      >
>      > Remind.com <https://www.remind.com/>| BLOG
>     <http://blog.remind.com/> |
>      > FOLLOW US <https://twitter.com/remindhq> | LIKE US
>      > <https://www.facebook.com/remindhq>
>      >
>
>
>
> --
>
> Rex Fenley|Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> |
> FOLLOW US <https://twitter.com/remindhq> | LIKE US
> <https://www.facebook.com/remindhq>
>