Spark dataframe explode function -


can please explain why case row, seq[row] used after explode of dataframe field has collection of elements. , can please explain me reason why asinstanceof required values exploded field?

here syntax:

val explodeddepartmentwithemployeesdf = departmentwithemployeesdf.explode(departmentwithemployeesdf("employees")) {                                case row(employee: seq[row]) =>                            employee.map(employee =>                           employee(employee(0).asinstanceof[string],                            employee(1).asinstanceof[string], employee(2).asinstanceof[string]) ) } 

first note, can not explain why explode() turns row(employee: seq[row]) don't know schema of dataframe. have assume has structure of data.

not knowing original data, have created small data set work from

scala> val df = sc.parallelize( array( (1, "dsfds dsf dasf dsf dsf d"), (2, "2344 2353 24 23432 234"))).todf("id", "text") df: org.apache.spark.sql.dataframe = [id: int, text: string] 

if map on it, can se returns rows containing data of type any.

scala> df.map {case row: row => (row(0), row(1)) } res21: org.apache.spark.rdd.rdd[(any, any)] = mappartitionsrdd[17] @ map @ <console>:33 

you have lost type information, why need explicitly specify type when want use data in row

scala> df.map {case row: row => (row(0).asinstanceof[int], row(1).asinstanceof[string]) } res22: org.apache.spark.rdd.rdd[(int, string)] = mappartitionsrdd[18] @ map @ <console>:33 

so, in order explode it, have following

scala> :paste // entering paste mode (ctrl-d finish)  import org.apache.spark.sql.row df.explode(col("id"), col("text")) {case row: row =>     val id = row(0).asinstanceof[int]     val words = row(1).asinstanceof[string].split(" ")     words.map(word => (id, word)) }  // exiting paste mode, interpreting.  import org.apache.spark.sql.row res30: org.apache.spark.sql.dataframe = [id: int, text: string, _1: int, _2: string]  scala> res30 show +---+--------------------+---+-----+ | id|                text| _1|   _2| +---+--------------------+---+-----+ |  1|dsfds dsf dasf ds...|  1|dsfds| |  1|dsfds dsf dasf ds...|  1|  dsf| |  1|dsfds dsf dasf ds...|  1| dasf| |  1|dsfds dsf dasf ds...|  1|  dsf| |  1|dsfds dsf dasf ds...|  1|  dsf| |  1|dsfds dsf dasf ds...|  1|    d| |  2|2344 2353 24 2343...|  2| 2344| |  2|2344 2353 24 2343...|  2| 2353| |  2|2344 2353 24 2343...|  2|   24| |  2|2344 2353 24 2343...|  2|23432| |  2|2344 2353 24 2343...|  2|  234| +---+--------------------+---+-----+ 

if want named columns, can define case class hold exploded data

scala> :paste // entering paste mode (ctrl-d finish)  import org.apache.spark.sql.row case class explodeddata(word: string) df.explode(col("id"), col("text")) {case row: row =>     val words = row(1).asinstanceof[string].split(" ")     words.map(word => explodeddata(word)) }  // exiting paste mode, interpreting.  import org.apache.spark.sql.row defined class explodeddata res35: org.apache.spark.sql.dataframe = [id: int, text: string, word: string]  scala> res35.select("id","word").show +---+-----+ | id| word| +---+-----+ |  1|dsfds| |  1|  dsf| |  1| dasf| |  1|  dsf| |  1|  dsf| |  1|    d| |  2| 2344| |  2| 2353| |  2|   24| |  2|23432| |  2|  234| +---+-----+ 

hope brings clearity.


Comments

Popular posts from this blog

mysql - Dreamhost PyCharm Django Python 3 Launching a Site -

java - Sending SMS with SMSLib and Web Services -

java - How to resolve The method toString() in the type Object is not applicable for the arguments (InputStream) -