Flink join operator after sorting seems to group fields (Scala)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink join operator after sorting seems to group fields (Scala)

Felipe Gutierrez
Hi all,

I have implemented a simple Scala object using Flink to play with joins operator. After that, I put the join operator show my results I decided to sort the output by the first field (.sortPartition(0, Order.ASCENDING)). It seems that the output is ordered by group. The output shows two groups of "Fyodor Dostoyevsky". Why is this happening? How do I sort the complete DataSet?

Kind Regards,
Felipe

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

object JoinBooksAndAuthors {
  val AUTHOR_ID_FIELD: Int = 0
  val AUTHOR_NAME_FIELD: Int = 1

  val BOOK_AUTHORID_FIELD: Int = 0
  val BOOK_YEAR_FIELD: Int = 1
  val BOOK_NAME_FIELD: Int = 2

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val authors = env.readCsvFile[(Int, String)](
      "downloads/authors.tsv",
      fieldDelimiter = "\t",
      lineDelimiter = "\n",
      includedFields = Array(0, 1)
    )

    val books = env.readCsvFile[(Int, Short, String)](
      "downloads/books.tsv",
      fieldDelimiter = "\t",
      lineDelimiter = "\n",
      includedFields = Array(0, 1, 2)
    )

    authors
      .join(books)
      .where(AUTHOR_ID_FIELD)
      .equalTo(BOOK_AUTHORID_FIELD)
      .map(tuple => (tuple._1._2, tuple._2._3))
      .sortPartition(0, Order.ASCENDING)
      .print()
  }
}

output

(Charles Bukowski,Women)
(Charles Bukowski,The Most Beautiful Woman in Town)
(Charles Bukowski,Hot Water Music)
(Charles Bukowski,Barfly)
(Charles Bukowski,Notes of a Dirty Old Man)
(Charles Bukowski,Ham on Rye)
(Fyodor Dostoyevsky,The Brothers Karamazov)
(Fyodor Dostoyevsky,The Double: A Petersburg Poem)
(Fyodor Dostoyevsky,Poor Folk)
(George Orwell,Coming Up for Air)
(George Orwell,Burmese Days)
(George Orwell,A Clergyman's Daughter)
(George Orwell,Down and Out in Paris and London)
(Albert Camus,The Plague)
(Fyodor Dostoyevsky,The Eternal Husband)
(Fyodor Dostoyevsky,The Gambler)
(Fyodor Dostoyevsky,The House of the Dead)
(Fyodor Dostoyevsky,Crime and Punishment)
(Fyodor Dostoyevsky,Netochka Nezvanova)
.....





--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez
Reply | Threaded
Open this post in threaded view
|

Re: Flink join operator after sorting seems to group fields (Scala)

Xingcan Cui
Hi Felipe,

the `sortPartition()` method just LOCALLY sorts each partition of a dataset. To achieve a global sorting, use this method after a `partitionByRange()` (e.g., `result.partitionByRange(0).sortPartition(0, Order.ASCENDING)`).

Hope that helps,
Xingcan

On 3 Mar 2018, at 9:33 PM, Felipe Gutierrez <[hidden email]> wrote:

Hi all,

I have implemented a simple Scala object using Flink to play with joins operator. After that, I put the join operator show my results I decided to sort the output by the first field (.sortPartition(0, Order.ASCENDING)). It seems that the output is ordered by group. The output shows two groups of "Fyodor Dostoyevsky". Why is this happening? How do I sort the complete DataSet?

Kind Regards,
Felipe

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

object JoinBooksAndAuthors {
  val AUTHOR_ID_FIELD: Int = 0
  val AUTHOR_NAME_FIELD: Int = 1

  val BOOK_AUTHORID_FIELD: Int = 0
  val BOOK_YEAR_FIELD: Int = 1
  val BOOK_NAME_FIELD: Int = 2

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val authors = env.readCsvFile[(Int, String)](
      "downloads/authors.tsv",
      fieldDelimiter = "\t",
      lineDelimiter = "\n",
      includedFields = Array(0, 1)
    )

    val books = env.readCsvFile[(Int, Short, String)](
      "downloads/books.tsv",
      fieldDelimiter = "\t",
      lineDelimiter = "\n",
      includedFields = Array(0, 1, 2)
    )

    authors
      .join(books)
      .where(AUTHOR_ID_FIELD)
      .equalTo(BOOK_AUTHORID_FIELD)
      .map(tuple => (tuple._1._2, tuple._2._3))
      .sortPartition(0, Order.ASCENDING)
      .print()
  }
}

output

(Charles Bukowski,Women)
(Charles Bukowski,The Most Beautiful Woman in Town)
(Charles Bukowski,Hot Water Music)
(Charles Bukowski,Barfly)
(Charles Bukowski,Notes of a Dirty Old Man)
(Charles Bukowski,Ham on Rye)
(Fyodor Dostoyevsky,The Brothers Karamazov)
(Fyodor Dostoyevsky,The Double: A Petersburg Poem)
(Fyodor Dostoyevsky,Poor Folk)
(George Orwell,Coming Up for Air)
(George Orwell,Burmese Days)
(George Orwell,A Clergyman's Daughter)
(George Orwell,Down and Out in Paris and London)
(Albert Camus,The Plague)
(Fyodor Dostoyevsky,The Eternal Husband)
(Fyodor Dostoyevsky,The Gambler)
(Fyodor Dostoyevsky,The House of the Dead)
(Fyodor Dostoyevsky,Crime and Punishment)
(Fyodor Dostoyevsky,Netochka Nezvanova)
.....





--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez

Reply | Threaded
Open this post in threaded view
|

Re: Flink join operator after sorting seems to group fields (Scala)

Felipe Gutierrez
thanks Xingcan!

Now my output is correct and I am understanding a little bit more about Flink architecture. So I have to consider that the processing was done in different partitions even I am running it locally.

Kind Regards,
Felipe

On Sat, Mar 3, 2018 at 11:25 AM, Xingcan Cui <[hidden email]> wrote:
Hi Felipe,

the `sortPartition()` method just LOCALLY sorts each partition of a dataset. To achieve a global sorting, use this method after a `partitionByRange()` (e.g., `result.partitionByRange(0).sortPartition(0, Order.ASCENDING)`).

Hope that helps,
Xingcan

On 3 Mar 2018, at 9:33 PM, Felipe Gutierrez <[hidden email]> wrote:

Hi all,

I have implemented a simple Scala object using Flink to play with joins operator. After that, I put the join operator show my results I decided to sort the output by the first field (.sortPartition(0, Order.ASCENDING)). It seems that the output is ordered by group. The output shows two groups of "Fyodor Dostoyevsky". Why is this happening? How do I sort the complete DataSet?

Kind Regards,
Felipe

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

object JoinBooksAndAuthors {
  val AUTHOR_ID_FIELD: Int = 0
  val AUTHOR_NAME_FIELD: Int = 1

  val BOOK_AUTHORID_FIELD: Int = 0
  val BOOK_YEAR_FIELD: Int = 1
  val BOOK_NAME_FIELD: Int = 2

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val authors = env.readCsvFile[(Int, String)](
      "downloads/authors.tsv",
      fieldDelimiter = "\t",
      lineDelimiter = "\n",
      includedFields = Array(0, 1)
    )

    val books = env.readCsvFile[(Int, Short, String)](
      "downloads/books.tsv",
      fieldDelimiter = "\t",
      lineDelimiter = "\n",
      includedFields = Array(0, 1, 2)
    )

    authors
      .join(books)
      .where(AUTHOR_ID_FIELD)
      .equalTo(BOOK_AUTHORID_FIELD)
      .map(tuple => (tuple._1._2, tuple._2._3))
      .sortPartition(0, Order.ASCENDING)
      .print()
  }
}

output

(Charles Bukowski,Women)
(Charles Bukowski,The Most Beautiful Woman in Town)
(Charles Bukowski,Hot Water Music)
(Charles Bukowski,Barfly)
(Charles Bukowski,Notes of a Dirty Old Man)
(Charles Bukowski,Ham on Rye)
(Fyodor Dostoyevsky,The Brothers Karamazov)
(Fyodor Dostoyevsky,The Double: A Petersburg Poem)
(Fyodor Dostoyevsky,Poor Folk)
(George Orwell,Coming Up for Air)
(George Orwell,Burmese Days)
(George Orwell,A Clergyman's Daughter)
(George Orwell,Down and Out in Paris and London)
(Albert Camus,The Plague)
(Fyodor Dostoyevsky,The Eternal Husband)
(Fyodor Dostoyevsky,The Gambler)
(Fyodor Dostoyevsky,The House of the Dead)
(Fyodor Dostoyevsky,Crime and Punishment)
(Fyodor Dostoyevsky,Netochka Nezvanova)
.....





--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez




--
--
-- Felipe Oliveira Gutierrez
-- skype: felipe.o.gutierrez