Hello, I keep running into trouble moving between DataStream and SQL with POJOs because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there any way to convert them back to POJOs in Flink when converting a SQL Table back to a DataStream? Thanks! -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Correction, I'm using Scala case classes not strictly Java POJOs just to be clear. On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Maybe this is related to this issue? https://issues.apache.org/jira/browse/FLINK-17683 On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
@Timo: Is this sth that would work when using the new type stack? From
the message I'm assuming it's using the older type stack. @Rex: Which Flink version are you using and could you maybe post the code snipped that you use to do conversions? Best, Aljoscha On 02.11.20 06:50, Rex Fenley wrote: > Maybe this is related to this issue? > https://issues.apache.org/jira/browse/FLINK-17683 > > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <[hidden email]> wrote: > >> Correction, I'm using Scala case classes not strictly Java POJOs just to >> be clear. >> >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <[hidden email]> wrote: >> >>> Hello, >>> >>> I keep running into trouble moving between DataStream and SQL with POJOs >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there any >>> way to convert them back to POJOs in Flink when converting a SQL Table back >>> to a DataStream? >>> >>> Thanks! >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | >> FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > |
Flink 1.11.2 with Scala 2.12 Error: [info] JobScalaTest: [info] - dummy *** FAILED *** [info] org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink do not match. [info] Query schema: [user: BIGINT, product: ROW<`name` VARCHAR(2147483647), `id` BIGINT>, amount: INT] [info] Sink schema: [user: BIGINT, product: LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem, rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'), amount: INT] [info] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103) [info] at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260) [info] at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163) [info] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285) [info] at scala.collection.Iterator.foreach(Iterator.scala:943) [info] at scala.collection.Iterator.foreach$(Iterator.scala:943) [info] at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) [info] at scala.collection.IterableLike.foreach(IterableLike.scala:74) [info] at scala.collection.IterableLike.foreach$(IterableLike.scala:73) [info] at scala.collection.AbstractIterable.foreach(Iterable.scala:56) Code: import com.remind.graph.people.PeopleJobScala import org.scalatest.funsuite._ import org.scalatest.BeforeAndAfter import org.apache.flink.streaming.api.scala.{ DataStream, StreamExecutionEnvironment } import org.apache.flink.streaming.util.TestStreamEnvironment import org.apache.flink.table.runtime.util._ import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.api.common.state.ListState import org.apache.flink.runtime.state.FunctionInitializationContext import org.apache.flink.api.common.state.ListStateDescriptor import org.apache.flink.runtime.state.FunctionSnapshotContext import org.apache.flink.types.Row import java.io.Serializable; import java.sql.Timestamp; import java.text.SimpleDateFormat import java.util.concurrent.atomic.AtomicInteger import java.{util => ju} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Try case class Order(user: Long, product: ProductItem, amount: Int) { def this() { this(0, null, 0) } override def toString(): String = { return "Order{" + "user=" + user + ", product='" + product + '\'' + ", amount=" + amount + '}'; } } case class ProductItem(name: String, id: Long) { def this() { this(null, 0) } override def toString(): String = { return "Product{" + "name='" + name + '\'' + ", id=" + id + '}'; } } class JobScalaTest extends AnyFunSuite with BeforeAndAfter { var env: StreamExecutionEnvironment = _ var tEnv: StreamTableEnvironment = _ before { this.env = StreamExecutionEnvironment.getExecutionEnvironment this.env.setParallelism(2) this.env.getConfig.enableObjectReuse() val setting = EnvironmentSettings.newInstance().inStreamingMode().build() this.tEnv = StreamTableEnvironment.create(env, setting) } after { StreamTestSink.clear() // TestValuesTableFactory.clearAllData() } def dateFrom(stringDate: String): java.sql.Date = { val date = new SimpleDateFormat("dd/MM/yyyy") .parse(stringDate) return new java.sql.Date(date.getTime()) } def printTable(table: Table) = { println(table) table.printSchema() println(table.getSchema().getFieldNames().mkString(", ")) } def printDataStream(dataStream: DataStream[_]) = { println(dataStream) println(dataStream.dataType) } test("dummy") { val orderA: DataStream[Order] = this.env.fromCollection( Seq( new Order(1L, new ProductItem("beer", 10L), 3), new Order(1L, new ProductItem("diaper", 11L), 4), new Order(3L, new ProductItem("rubber", 12L), 2) ) ) val orderB: DataStream[Order] = this.env.fromCollection( Seq( new Order(2L, new ProductItem("pen", 13L), 3), new Order(2L, new ProductItem("rubber", 12L), 3), new Order(4L, new ProductItem("beer", 10L), 1) ) ) println(orderB) println(orderB.dataType) // convert DataStream to Table val tableA = this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount) println(tableA) tableA.printSchema() println(tableA.getSchema().getFieldNames().mkString(", ")) // register DataStream as Table this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, 'amount) // union the two tables val result = this.tEnv.sqlQuery(s""" |SELECT * FROM $tableA WHERE amount > 2 |UNION ALL |SELECT * FROM OrderB WHERE amount < 2 """.stripMargin) val sink = new StringSink[Order]() result.toAppendStream[Order].addSink(sink) this.env.execute() val expected = List( "Order{user=1, product='Product{name='beer', id=10}', amount=3}", "Order{user=1, product='Product{name='diaper', id=11}', amount=4}", "Order{user=4, product='Product{name='beer', id=10}', amount=1}" ) val results = sink.getResults.sorted println("results") println(results) assert(expected.sorted === results) } } /** * There's a whole bunch of other test sinks to choose from there. */ object StreamTestSink { val idCounter: AtomicInteger = new AtomicInteger(0) val globalResults = mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]] val globalRetractResults = mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]] val globalUpsertResults = mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, String]]] def getNewSinkId: Int = { val idx = idCounter.getAndIncrement() this.synchronized { globalResults.put( idx, mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]] ) globalRetractResults.put( idx, mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]] ) globalUpsertResults.put( idx, mutable.HashMap.empty[Int, mutable.Map[String, String]] ) } idx } def clear(): Unit = { globalResults.clear() globalRetractResults.clear() globalUpsertResults.clear() } } abstract class AbstractExactlyOnceSink[T] extends RichSinkFunction[T] with CheckpointedFunction { protected var resultsState: ListState[String] = _ protected var localResults: mutable.ArrayBuffer[String] = _ protected val idx: Int = StreamTestSink.getNewSinkId protected var globalResults: mutable.Map[Int, mutable.ArrayBuffer[String]] = _ protected var globalRetractResults : mutable.Map[Int, mutable.ArrayBuffer[String]] = _ protected var globalUpsertResults : mutable.Map[Int, mutable.Map[String, String]] = _ def isInitialized: Boolean = globalResults != null override def initializeState(context: FunctionInitializationContext): Unit = { resultsState = context.getOperatorStateStore .getListState( new ListStateDescriptor[String]("sink-results", Types.STRING) ) localResults = mutable.ArrayBuffer.empty[String] if (context.isRestored) { for (value <- resultsState.get().asScala) { localResults += value } } val taskId = getRuntimeContext.getIndexOfThisSubtask StreamTestSink.synchronized( StreamTestSink.globalResults(idx) += (taskId -> localResults) ) } override def snapshotState(context: FunctionSnapshotContext): Unit = { resultsState.clear() for (value <- localResults) { resultsState.add(value) } } protected def clearAndStashGlobalResults(): Unit = { if (globalResults == null) { StreamTestSink.synchronized { globalResults = StreamTestSink.globalResults.remove(idx).get globalRetractResults = StreamTestSink.globalRetractResults.remove(idx).get globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get } } } protected def getResults: List[String] = { clearAndStashGlobalResults() val result = mutable.ArrayBuffer.empty[String] this.globalResults.foreach { case (_, list) => result ++= list } result.toList } } final class StringSink[T] extends AbstractExactlyOnceSink[T]() { override def invoke(value: T) { localResults += value.toString } override def getResults: List[String] = super.getResults } On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <[hidden email]> wrote: @Timo: Is this sth that would work when using the new type stack? From -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
My jobs normally use the blink planner, I noticed with this test that may not be the case. On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi Rex,
sorry for the late reply. POJOs will have much better support in the upcoming Flink versions because they have been fully integrated with the new table type system mentioned in FLIP-37 [1] (e.g. support for immutable POJOs and nested DataTypeHints etc). For queries, scalar, and table functions you can already use the full POJOs within the table ecosystem. However, the only missing piece is the new translation of POJOs from Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until then I would recommend to either use `Row` as the output of the table API or try to use a scalar function before that maps to the desired data structure. I hope this helps a bit. Regards, Timo [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API On 02.11.20 21:44, Rex Fenley wrote: > My jobs normally use the blink planner, I noticed with this test that > may not be the case. > > On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <[hidden email] > <mailto:[hidden email]>> wrote: > > Flink 1.11.2 with Scala 2.12 > > Error: > [info] JobScalaTest: > [info] - dummy *** FAILED *** > [info] org.apache.flink.table.api.ValidationException: Field types > of query result and registered TableSink do not match. > [info] Query schema: [user: BIGINT, product: ROW<`name` > VARCHAR(2147483647), `id` BIGINT>, amount: INT] > [info] Sink schema: [user: BIGINT, product: > LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem, > rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'), > amount: INT] > [info] at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103) > [info] at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260) > [info] at > org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163) > [info] at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285) > [info] at scala.collection.Iterator.foreach(Iterator.scala:943) > [info] at scala.collection.Iterator.foreach$(Iterator.scala:943) > [info] at > scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > [info] at scala.collection.IterableLike.foreach(IterableLike.scala:74) > [info] at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) > [info] at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > > Code: > import com.remind.graph.people.PeopleJobScala > > import org.scalatest.funsuite._ > import org.scalatest.BeforeAndAfter > > import org.apache.flink.streaming.api.scala.{ > DataStream, > StreamExecutionEnvironment > } > import org.apache.flink.streaming.util.TestStreamEnvironment > import org.apache.flink.table.runtime.util._ > import org.apache.flink.test.util.AbstractTestBase > import org.apache.flink.table.api._ > import org.apache.flink.table.api.bridge.scala._ > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction > import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction > import org.apache.flink.api.common.state.ListState > import org.apache.flink.runtime.state.FunctionInitializationContext > import org.apache.flink.api.common.state.ListStateDescriptor > import org.apache.flink.runtime.state.FunctionSnapshotContext > import org.apache.flink.types.Row > > import java.io.Serializable; > import java.sql.Timestamp; > import java.text.SimpleDateFormat > import java.util.concurrent.atomic.AtomicInteger > import java.{util => ju} > > import scala.collection.JavaConverters._ > import scala.collection.mutable > import scala.util.Try > > caseclassOrder(user: Long, product: ProductItem, amount: Int) { > defthis() { > this(0, null, 0) > } > > overridedeftoString(): String = { > return"Order{"+ > "user="+ user + > ", product='"+ product + '\''+ > ", amount="+ amount + > '}'; > } > } > > caseclassProductItem(name: String, id: Long) { > defthis() { > this(null, 0) > } > > overridedeftoString(): String = { > return"Product{"+ > "name='"+ name + '\''+ > ", id="+ id + > '}'; > } > } > > classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{ > varenv: StreamExecutionEnvironment = _ > vartEnv: StreamTableEnvironment = _ > > before { > this.env = StreamExecutionEnvironment.getExecutionEnvironment > this.env.setParallelism(2) > this.env.getConfig.enableObjectReuse() > valsetting = EnvironmentSettings.newInstance().inStreamingMode().build() > this.tEnv = StreamTableEnvironment.create(env, setting) > } > > after { > StreamTestSink.clear() > // TestValuesTableFactory.clearAllData() > } > > defdateFrom(stringDate: String): java.sql.Date = { > valdate = newSimpleDateFormat("dd/MM/yyyy") > .parse(stringDate) > returnnewjava.sql.Date(date.getTime()) > } > > defprintTable(table: Table) = { > println(table) > table.printSchema() > println(table.getSchema().getFieldNames().mkString(", ")) > } > > defprintDataStream(dataStream: DataStream[_]) = { > println(dataStream) > println(dataStream.dataType) > } > > test("dummy") { > valorderA: DataStream[Order] = this.env.fromCollection( > Seq( > newOrder(1L, newProductItem("beer", 10L), 3), > newOrder(1L, newProductItem("diaper", 11L), 4), > newOrder(3L, newProductItem("rubber", 12L), 2) > ) > ) > > valorderB: DataStream[Order] = this.env.fromCollection( > Seq( > newOrder(2L, newProductItem("pen", 13L), 3), > newOrder(2L, newProductItem("rubber", 12L), 3), > newOrder(4L, newProductItem("beer", 10L), 1) > ) > ) > > println(orderB) > println(orderB.dataType) > > // convert DataStream to Table > valtableA = > this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount) > println(tableA) > tableA.printSchema() > println(tableA.getSchema().getFieldNames().mkString(", ")) > // register DataStream as Table > this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, > 'amount) > > // union the two tables > valresult = this.tEnv.sqlQuery(s""" > |SELECT * FROM $tableAWHERE amount > 2 > |UNION ALL > |SELECT * FROM OrderB WHERE amount < 2 > """.stripMargin) > > valsink = newStringSink[Order]() > result.toAppendStream[Order].addSink(sink) > > this.env.execute() > > valexpected = List( > "Order{user=1, product='Product{name='beer', id=10}', amount=3}", > "Order{user=1, product='Product{name='diaper', id=11}', amount=4}", > "Order{user=4, product='Product{name='beer', id=10}', amount=1}" > ) > valresults = sink.getResults.sorted > println("results") > println(results) > assert(expected.sorted === results) > } > } > > /** > * Taken from: > https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala > * There's a whole bunch of other test sinks to choose from there. > */ > objectStreamTestSink { > > validCounter: AtomicInteger = newAtomicInteger(0) > > valglobalResults = > mutable.HashMap.empty[Int, mutable.Map[Int, > mutable.ArrayBuffer[String]]] > valglobalRetractResults = > mutable.HashMap.empty[Int, mutable.Map[Int, > mutable.ArrayBuffer[String]]] > valglobalUpsertResults = > mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, > String]]] > > defgetNewSinkId: Int = { > validx = idCounter.getAndIncrement() > this.synchronized{ > globalResults.put( > idx, > mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]] > ) > globalRetractResults.put( > idx, > mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]] > ) > globalUpsertResults.put( > idx, > mutable.HashMap.empty[Int, mutable.Map[String, String]] > ) > } > idx > } > > defclear(): Unit = { > globalResults.clear() > globalRetractResults.clear() > globalUpsertResults.clear() > } > } > > abstractclassAbstractExactlyOnceSink[T] > extendsRichSinkFunction[T] > withCheckpointedFunction{ > protectedvarresultsState: ListState[String] = _ > protectedvarlocalResults: mutable.ArrayBuffer[String] = _ > protectedvalidx: Int = StreamTestSink.getNewSinkId > > protectedvarglobalResults: mutable.Map[Int, > mutable.ArrayBuffer[String]] = _ > protectedvarglobalRetractResults > : mutable.Map[Int, mutable.ArrayBuffer[String]] = _ > protectedvarglobalUpsertResults > : mutable.Map[Int, mutable.Map[String, String]] = _ > > defisInitialized: Boolean = globalResults != null > > overridedefinitializeState(context: FunctionInitializationContext): > Unit = { > resultsState = context.getOperatorStateStore > .getListState( > newListStateDescriptor[String]("sink-results", Types.STRING) > ) > > localResults = mutable.ArrayBuffer.empty[String] > > if(context.isRestored) { > for(value <- resultsState.get().asScala) { > localResults += value > } > } > > valtaskId = getRuntimeContext.getIndexOfThisSubtask > StreamTestSink.synchronized( > StreamTestSink.globalResults(idx) += (taskId -> localResults) > ) > } > > overridedefsnapshotState(context: FunctionSnapshotContext): Unit = { > resultsState.clear() > for(value <- localResults) { > resultsState.add(value) > } > } > > protecteddefclearAndStashGlobalResults(): Unit = { > if(globalResults == null) { > StreamTestSink.synchronized{ > globalResults = StreamTestSink.globalResults.remove(idx).get > globalRetractResults = > StreamTestSink.globalRetractResults.remove(idx).get > globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get > } > } > } > > protecteddefgetResults: List[String] = { > clearAndStashGlobalResults() > valresult = mutable.ArrayBuffer.empty[String] > this.globalResults.foreach { > case(_, list) => result ++= list > } > result.toList > } > } > > finalclassStringSink[T] extendsAbstractExactlyOnceSink[T]() { > overridedefinvoke(value: T) { > localResults += value.toString > } > > overridedefgetResults: List[String] = super.getResults > } > > > > On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <[hidden email] > <mailto:[hidden email]>> wrote: > > @Timo: Is this sth that would work when using the new type > stack? From > the message I'm assuming it's using the older type stack. > > @Rex: Which Flink version are you using and could you maybe post > the > code snipped that you use to do conversions? > > Best, > Aljoscha > > On 02.11.20 06:50, Rex Fenley wrote: > > Maybe this is related to this issue? > > https://issues.apache.org/jira/browse/FLINK-17683 > > > > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <[hidden email] > <mailto:[hidden email]>> wrote: > > > >> Correction, I'm using Scala case classes not strictly Java > POJOs just to > >> be clear. > >> > >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley > <[hidden email] <mailto:[hidden email]>> wrote: > >> > >>> Hello, > >>> > >>> I keep running into trouble moving between DataStream and > SQL with POJOs > >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', > is there any > >>> way to convert them back to POJOs in Flink when converting > a SQL Table back > >>> to a DataStream? > >>> > >>> Thanks! > >>> > >>> -- > >>> > >>> Rex Fenley | Software Engineer - Mobile and Backend > >>> > >>> > >>> Remind.com <https://www.remind.com/> | BLOG > <http://blog.remind.com/> > >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US > >>> <https://www.facebook.com/remindhq> > >>> > >> > >> > >> -- > >> > >> Rex Fenley | Software Engineer - Mobile and Backend > >> > >> > >> Remind.com <https://www.remind.com/> | BLOG > <http://blog.remind.com/> | > >> FOLLOW US <https://twitter.com/remindhq> | LIKE US > >> <https://www.facebook.com/remindhq> > >> > > > > > > > > -- > > Rex Fenley|Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/>| BLOG > <http://blog.remind.com/> | FOLLOW US > <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > > > > -- > > Rex Fenley|Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > |
Thank you for the info! Is there a timetable for when the next version with this change might release? On Wed, Nov 4, 2020 at 2:44 AM Timo Walther <[hidden email]> wrote: Hi Rex, -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
It was planned for 1.12 but didn't make it. 1.13 should fully implement
FLIP-136. I just created issues to monitor the progress: https://issues.apache.org/jira/browse/FLINK-19976 Regards, Timo On 04.11.20 18:43, Rex Fenley wrote: > Thank you for the info! > > Is there a timetable for when the next version with this change might > release? > > On Wed, Nov 4, 2020 at 2:44 AM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Rex, > > sorry for the late reply. POJOs will have much better support in the > upcoming Flink versions because they have been fully integrated with > the > new table type system mentioned in FLIP-37 [1] (e.g. support for > immutable POJOs and nested DataTypeHints etc). > > For queries, scalar, and table functions you can already use the full > POJOs within the table ecosystem. > > However, the only missing piece is the new translation of POJOs from > Table API to DataStream API. This will be fixed in FLIP-136 [2]. Until > then I would recommend to either use `Row` as the output of the table > API or try to use a scalar function before that maps to the desired > data > structure. > > I hope this helps a bit. > > Regards, > Timo > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System > [2] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > > On 02.11.20 21:44, Rex Fenley wrote: > > My jobs normally use the blink planner, I noticed with this test > that > > may not be the case. > > > > On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <[hidden email] > <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > > Flink 1.11.2 with Scala 2.12 > > > > Error: > > [info] JobScalaTest: > > [info] - dummy *** FAILED *** > > [info] org.apache.flink.table.api.ValidationException: > Field types > > of query result and registered TableSink do not match. > > [info] Query schema: [user: BIGINT, product: ROW<`name` > > VARCHAR(2147483647), `id` BIGINT>, amount: INT] > > [info] Sink schema: [user: BIGINT, product: > > LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem, > > > rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'), > > amount: INT] > > [info] at > > > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103) > > [info] at > > > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260) > > [info] at > > > org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163) > > [info] at > > > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285) > > [info] at scala.collection.Iterator.foreach(Iterator.scala:943) > > [info] at > scala.collection.Iterator.foreach$(Iterator.scala:943) > > [info] at > > scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > > [info] at > scala.collection.IterableLike.foreach(IterableLike.scala:74) > > [info] at > > scala.collection.IterableLike.foreach$(IterableLike.scala:73) > > [info] at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) > > > > Code: > > import com.remind.graph.people.PeopleJobScala > > > > import org.scalatest.funsuite._ > > import org.scalatest.BeforeAndAfter > > > > import org.apache.flink.streaming.api.scala.{ > > DataStream, > > StreamExecutionEnvironment > > } > > import org.apache.flink.streaming.util.TestStreamEnvironment > > import org.apache.flink.table.runtime.util._ > > import org.apache.flink.test.util.AbstractTestBase > > import org.apache.flink.table.api._ > > import org.apache.flink.table.api.bridge.scala._ > > import org.apache.flink.streaming.api.scala._ > > import > org.apache.flink.streaming.api.functions.sink.RichSinkFunction > > import > org.apache.flink.streaming.api.checkpoint.CheckpointedFunction > > import org.apache.flink.api.common.state.ListState > > import > org.apache.flink.runtime.state.FunctionInitializationContext > > import org.apache.flink.api.common.state.ListStateDescriptor > > import org.apache.flink.runtime.state.FunctionSnapshotContext > > import org.apache.flink.types.Row > > > > import java.io.Serializable; > > import java.sql.Timestamp; > > import java.text.SimpleDateFormat > > import java.util.concurrent.atomic.AtomicInteger > > import java.{util => ju} > > > > import scala.collection.JavaConverters._ > > import scala.collection.mutable > > import scala.util.Try > > > > caseclassOrder(user: Long, product: ProductItem, amount: Int) { > > defthis() { > > this(0, null, 0) > > } > > > > overridedeftoString(): String = { > > return"Order{"+ > > "user="+ user + > > ", product='"+ product + '\''+ > > ", amount="+ amount + > > '}'; > > } > > } > > > > caseclassProductItem(name: String, id: Long) { > > defthis() { > > this(null, 0) > > } > > > > overridedeftoString(): String = { > > return"Product{"+ > > "name='"+ name + '\''+ > > ", id="+ id + > > '}'; > > } > > } > > > > classJobScalaTest extendsAnyFunSuitewithBeforeAndAfter{ > > varenv: StreamExecutionEnvironment = _ > > vartEnv: StreamTableEnvironment = _ > > > > before { > > this.env = StreamExecutionEnvironment.getExecutionEnvironment > > this.env.setParallelism(2) > > this.env.getConfig.enableObjectReuse() > > valsetting = > EnvironmentSettings.newInstance().inStreamingMode().build() > > this.tEnv = StreamTableEnvironment.create(env, setting) > > } > > > > after { > > StreamTestSink.clear() > > // TestValuesTableFactory.clearAllData() > > } > > > > defdateFrom(stringDate: String): java.sql.Date = { > > valdate = newSimpleDateFormat("dd/MM/yyyy") > > .parse(stringDate) > > returnnewjava.sql.Date(date.getTime()) > > } > > > > defprintTable(table: Table) = { > > println(table) > > table.printSchema() > > println(table.getSchema().getFieldNames().mkString(", ")) > > } > > > > defprintDataStream(dataStream: DataStream[_]) = { > > println(dataStream) > > println(dataStream.dataType) > > } > > > > test("dummy") { > > valorderA: DataStream[Order] = this.env.fromCollection( > > Seq( > > newOrder(1L, newProductItem("beer", 10L), 3), > > newOrder(1L, newProductItem("diaper", 11L), 4), > > newOrder(3L, newProductItem("rubber", 12L), 2) > > ) > > ) > > > > valorderB: DataStream[Order] = this.env.fromCollection( > > Seq( > > newOrder(2L, newProductItem("pen", 13L), 3), > > newOrder(2L, newProductItem("rubber", 12L), 3), > > newOrder(4L, newProductItem("beer", 10L), 1) > > ) > > ) > > > > println(orderB) > > println(orderB.dataType) > > > > // convert DataStream to Table > > valtableA = > > this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount) > > println(tableA) > > tableA.printSchema() > > println(tableA.getSchema().getFieldNames().mkString(", ")) > > // register DataStream as Table > > this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, > > 'amount) > > > > // union the two tables > > valresult = this.tEnv.sqlQuery(s""" > > |SELECT * FROM $tableAWHERE amount > 2 > > |UNION ALL > > |SELECT * FROM OrderB WHERE amount < 2 > > """.stripMargin) > > > > valsink = newStringSink[Order]() > > result.toAppendStream[Order].addSink(sink) > > > > this.env.execute() > > > > valexpected = List( > > "Order{user=1, product='Product{name='beer', id=10}', amount=3}", > > "Order{user=1, product='Product{name='diaper', id=11}', > amount=4}", > > "Order{user=4, product='Product{name='beer', id=10}', amount=1}" > > ) > > valresults = sink.getResults.sorted > > println("results") > > println(results) > > assert(expected.sorted === results) > > } > > } > > > > /** > > * Taken from: > > > https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala > > * There's a whole bunch of other test sinks to choose from there. > > */ > > objectStreamTestSink { > > > > validCounter: AtomicInteger = newAtomicInteger(0) > > > > valglobalResults = > > mutable.HashMap.empty[Int, mutable.Map[Int, > > mutable.ArrayBuffer[String]]] > > valglobalRetractResults = > > mutable.HashMap.empty[Int, mutable.Map[Int, > > mutable.ArrayBuffer[String]]] > > valglobalUpsertResults = > > mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, > > String]]] > > > > defgetNewSinkId: Int = { > > validx = idCounter.getAndIncrement() > > this.synchronized{ > > globalResults.put( > > idx, > > mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]] > > ) > > globalRetractResults.put( > > idx, > > mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]] > > ) > > globalUpsertResults.put( > > idx, > > mutable.HashMap.empty[Int, mutable.Map[String, String]] > > ) > > } > > idx > > } > > > > defclear(): Unit = { > > globalResults.clear() > > globalRetractResults.clear() > > globalUpsertResults.clear() > > } > > } > > > > abstractclassAbstractExactlyOnceSink[T] > > extendsRichSinkFunction[T] > > withCheckpointedFunction{ > > protectedvarresultsState: ListState[String] = _ > > protectedvarlocalResults: mutable.ArrayBuffer[String] = _ > > protectedvalidx: Int = StreamTestSink.getNewSinkId > > > > protectedvarglobalResults: mutable.Map[Int, > > mutable.ArrayBuffer[String]] = _ > > protectedvarglobalRetractResults > > : mutable.Map[Int, mutable.ArrayBuffer[String]] = _ > > protectedvarglobalUpsertResults > > : mutable.Map[Int, mutable.Map[String, String]] = _ > > > > defisInitialized: Boolean = globalResults != null > > > > overridedefinitializeState(context: > FunctionInitializationContext): > > Unit = { > > resultsState = context.getOperatorStateStore > > .getListState( > > newListStateDescriptor[String]("sink-results", Types.STRING) > > ) > > > > localResults = mutable.ArrayBuffer.empty[String] > > > > if(context.isRestored) { > > for(value <- resultsState.get().asScala) { > > localResults += value > > } > > } > > > > valtaskId = getRuntimeContext.getIndexOfThisSubtask > > StreamTestSink.synchronized( > > StreamTestSink.globalResults(idx) += (taskId -> localResults) > > ) > > } > > > > overridedefsnapshotState(context: FunctionSnapshotContext): > Unit = { > > resultsState.clear() > > for(value <- localResults) { > > resultsState.add(value) > > } > > } > > > > protecteddefclearAndStashGlobalResults(): Unit = { > > if(globalResults == null) { > > StreamTestSink.synchronized{ > > globalResults = StreamTestSink.globalResults.remove(idx).get > > globalRetractResults = > > StreamTestSink.globalRetractResults.remove(idx).get > > globalUpsertResults = > StreamTestSink.globalUpsertResults.remove(idx).get > > } > > } > > } > > > > protecteddefgetResults: List[String] = { > > clearAndStashGlobalResults() > > valresult = mutable.ArrayBuffer.empty[String] > > this.globalResults.foreach { > > case(_, list) => result ++= list > > } > > result.toList > > } > > } > > > > finalclassStringSink[T] extendsAbstractExactlyOnceSink[T]() { > > overridedefinvoke(value: T) { > > localResults += value.toString > > } > > > > overridedefgetResults: List[String] = super.getResults > > } > > > > > > > > On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek > <[hidden email] <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > > @Timo: Is this sth that would work when using the new type > > stack? From > > the message I'm assuming it's using the older type stack. > > > > @Rex: Which Flink version are you using and could you > maybe post > > the > > code snipped that you use to do conversions? > > > > Best, > > Aljoscha > > > > On 02.11.20 06:50, Rex Fenley wrote: > > > Maybe this is related to this issue? > > > https://issues.apache.org/jira/browse/FLINK-17683 > > > > > > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley > <[hidden email] <mailto:[hidden email]> > > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > > > > >> Correction, I'm using Scala case classes not strictly > Java > > POJOs just to > > >> be clear. > > >> > > >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley > > <[hidden email] <mailto:[hidden email]> > <mailto:[hidden email] <mailto:[hidden email]>>> wrote: > > >> > > >>> Hello, > > >>> > > >>> I keep running into trouble moving between > DataStream and > > SQL with POJOs > > >>> because my nested POJOs turn into > LEGACY('STRUCTURED_TYPE', > > is there any > > >>> way to convert them back to POJOs in Flink when > converting > > a SQL Table back > > >>> to a DataStream? > > >>> > > >>> Thanks! > > >>> > > >>> -- > > >>> > > >>> Rex Fenley | Software Engineer - Mobile and Backend > > >>> > > >>> > > >>> Remind.com <https://www.remind.com/> | BLOG > > <http://blog.remind.com/> > > >>> | FOLLOW US <https://twitter.com/remindhq> | > LIKE US > > >>> <https://www.facebook.com/remindhq> > > >>> > > >> > > >> > > >> -- > > >> > > >> Rex Fenley | Software Engineer - Mobile and Backend > > >> > > >> > > >> Remind.com <https://www.remind.com/> | BLOG > > <http://blog.remind.com/> | > > >> FOLLOW US <https://twitter.com/remindhq> | LIKE US > > >> <https://www.facebook.com/remindhq> > > >> > > > > > > > > > > > > > > -- > > > > Rex Fenley|Software Engineer - Mobile and Backend > > > > > > Remind.com <https://www.remind.com/>| BLOG > > <http://blog.remind.com/> | FOLLOW US > > <https://twitter.com/remindhq> | LIKE US > > <https://www.facebook.com/remindhq> > > > > > > > > -- > > > > Rex Fenley|Software Engineer - Mobile and Backend > > > > > > Remind.com <https://www.remind.com/>| BLOG > <http://blog.remind.com/> | > > FOLLOW US <https://twitter.com/remindhq> | LIKE US > > <https://www.facebook.com/remindhq> > > > > > > -- > > Rex Fenley|Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/>| BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > |
Free forum by Nabble | Edit this page |