Spark dataframe explode function -
can please explain why case row
, seq[row]
used after explode of dataframe
field has collection of elements. , can please explain me reason why asinstanceof required values exploded field?
here syntax:
val explodeddepartmentwithemployeesdf = departmentwithemployeesdf.explode(departmentwithemployeesdf("employees")) { case row(employee: seq[row]) => employee.map(employee => employee(employee(0).asinstanceof[string], employee(1).asinstanceof[string], employee(2).asinstanceof[string]) ) }
first note, can not explain why explode()
turns row(employee: seq[row])
don't know schema of dataframe. have assume has structure of data.
not knowing original data, have created small data set work from
scala> val df = sc.parallelize( array( (1, "dsfds dsf dasf dsf dsf d"), (2, "2344 2353 24 23432 234"))).todf("id", "text") df: org.apache.spark.sql.dataframe = [id: int, text: string]
if map on it, can se returns rows containing data of type any.
scala> df.map {case row: row => (row(0), row(1)) } res21: org.apache.spark.rdd.rdd[(any, any)] = mappartitionsrdd[17] @ map @ <console>:33
you have lost type information, why need explicitly specify type when want use data in row
scala> df.map {case row: row => (row(0).asinstanceof[int], row(1).asinstanceof[string]) } res22: org.apache.spark.rdd.rdd[(int, string)] = mappartitionsrdd[18] @ map @ <console>:33
so, in order explode it, have following
scala> :paste // entering paste mode (ctrl-d finish) import org.apache.spark.sql.row df.explode(col("id"), col("text")) {case row: row => val id = row(0).asinstanceof[int] val words = row(1).asinstanceof[string].split(" ") words.map(word => (id, word)) } // exiting paste mode, interpreting. import org.apache.spark.sql.row res30: org.apache.spark.sql.dataframe = [id: int, text: string, _1: int, _2: string] scala> res30 show +---+--------------------+---+-----+ | id| text| _1| _2| +---+--------------------+---+-----+ | 1|dsfds dsf dasf ds...| 1|dsfds| | 1|dsfds dsf dasf ds...| 1| dsf| | 1|dsfds dsf dasf ds...| 1| dasf| | 1|dsfds dsf dasf ds...| 1| dsf| | 1|dsfds dsf dasf ds...| 1| dsf| | 1|dsfds dsf dasf ds...| 1| d| | 2|2344 2353 24 2343...| 2| 2344| | 2|2344 2353 24 2343...| 2| 2353| | 2|2344 2353 24 2343...| 2| 24| | 2|2344 2353 24 2343...| 2|23432| | 2|2344 2353 24 2343...| 2| 234| +---+--------------------+---+-----+
if want named columns, can define case class hold exploded data
scala> :paste // entering paste mode (ctrl-d finish) import org.apache.spark.sql.row case class explodeddata(word: string) df.explode(col("id"), col("text")) {case row: row => val words = row(1).asinstanceof[string].split(" ") words.map(word => explodeddata(word)) } // exiting paste mode, interpreting. import org.apache.spark.sql.row defined class explodeddata res35: org.apache.spark.sql.dataframe = [id: int, text: string, word: string] scala> res35.select("id","word").show +---+-----+ | id| word| +---+-----+ | 1|dsfds| | 1| dsf| | 1| dasf| | 1| dsf| | 1| dsf| | 1| d| | 2| 2344| | 2| 2353| | 2| 24| | 2|23432| | 2| 234| +---+-----+
hope brings clearity.
Comments
Post a Comment