scala - java.lang.ClassNotFoundException occurs when I use map function of DataFrame on Spark -
i have dataframe "ordereddf" , schema below:
root |-- schoolid: string (nullable = true) |-- count(studentid): long (nullable = false) |-- count(teacherid): long (nullable = false) |-- sum(size): long (nullable = true) |-- sum(documentcount): long (nullable = true) |-- avg_totalscore: double (nullable = true)
here data of dataframe "ordereddf":
+--------+----------------+----------------+---------+------------------+--------------+ |schoolid|count(studentid)|count(teacherid)|sum(size)|sum(documentcount)|avg_totalscore| +--------+----------------+----------------+---------+------------------+--------------+ |school03| 2| 2| 195| 314| 100.0| |school02| 2| 2| 193| 330| 94.5| |school01| 2| 2| 294| 285| 83.4| |school04| 2| 2| 263| 415| 72.5| |school05| 2| 2| 263| 415| 62.5| |school07| 2| 2| 263| 415| 52.5| |school09| 2| 2| 263| 415| 49.8| |school08| 2| 2| 263| 415| 42.3| |school06| 2| 2| 263| 415| 32.5| +--------+----------------+----------------+---------+------------------+--------------+
as can see column "avg_totalscore" ordered desc. now, have problem, want partition rows 3 group, below:
+--------+----------------+----------------+---------+------------------+--------------+ |schoolid|count(studentid)|count(teacherid)|sum(size)|sum(documentcount)|avg_totalscore| +--------+----------------+----------------+---------+------------------+--------------+ |great | 2| 2| 195| 314| 100.0| |great | 2| 2| 193| 330| 94.5| |great | 2| 2| 294| 285| 83.4| |good | 2| 2| 263| 415| 72.5| |good | 2| 2| 263| 415| 62.5| |good | 2| 2| 263| 415| 52.5| |bad | 2| 2| 263| 415| 49.8| |bad | 2| 2| 263| 415| 42.3| |bad | 2| 2| 263| 415| 32.5| +--------+----------------+----------------+---------+------------------+--------------+
in other words, want divide school 3 groups according "avg_totalscore", respectively great school, school, , bad school, rate 3:3:3 .
my solution following :
val num = ordereddf.count() val first_split_num = math.floor(num * (1.0/3)) val second_split_num = math.ceil(num * (2.0/3)) val accumu = sparkcontext.getorcreate(configuration.getsparkconf).accumulator(0, "group num") val rdd = ordereddf.map(row => { val group = { accumu match { case a: accumulator[int] if a.value <= first_split_num => "great" case b: accumulator[int] if b.value <= second_split_num => "good" case _ => "bad" } } accumu += 1 row(group, row(1), row(2), row(3), row(4), row(5), row(6)) }) val result = sqlcontext.createdataframe(rdd,ordereddf.schema)
the code above ok, there no exception, when use :
result.collect().foreach(println)
or
result.show()
i classnotfound exception, don't know reason. can me, thank much!
here details of exception:
org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 44.0 failed 4 times, recent failure: lost task 0.3 in stage 44.0 (tid 3644, node1): java.lang.classnotfoundexception: com.lancoo.ecbdc.business.comparativeanalysisbusiness$$anonfun$1 @ java.net.urlclassloader.findclass(urlclassloader.java:381) @ java.lang.classloader.loadclass(classloader.java:424) @ java.lang.classloader.loadclass(classloader.java:357) @ java.lang.class.forname0(native method) @ java.lang.class.forname(class.java:348) @ org.apache.spark.serializer.javadeserializationstream$$anon$1.resolveclass(javaserializer.scala:68) @ java.io.objectinputstream.readnonproxydesc(objectinputstream.java:1620) @ java.io.objectinputstream.readclassdesc(objectinputstream.java:1521) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1781) @ java.io.objectinputstream.readobject0(objectinputstream.java:1353) @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:2018) @ java.io.objectinputstream.readserialdata(objectinputstream.java:1942) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1808) @ java.io.objectinputstream.readobject0(objectinputstream.java:1353) @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:2018) @ java.io.objectinputstream.readserialdata(objectinputstream.java:1942) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1808) @ java.io.objectinputstream.readobject0(objectinputstream.java:1353) @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:2018) @ java.io.objectinputstream.readserialdata(objectinputstream.java:1942) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1808) @ java.io.objectinputstream.readobject0(objectinputstream.java:1353) @ java.io.objectinputstream.readobject(objectinputstream.java:373) @ scala.collection.immutable.$colon$colon.readobject(list.scala:362) @ sun.reflect.generatedmethodaccessor3.invoke(unknown source) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ java.io.objectstreamclass.invokereadobject(objectstreamclass.java:1058) @ java.io.objectinputstream.readserialdata(objectinputstream.java:1909) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1808) @ java.io.objectinputstream.readobject0(objectinputstream.java:1353) @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:2018) @ java.io.objectinputstream.readserialdata(objectinputstream.java:1942) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1808) @ java.io.objectinputstream.readobject0(objectinputstream.java:1353) @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:2018) @ java.io.objectinputstream.readserialdata(objectinputstream.java:1942) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1808) @ java.io.objectinputstream.readobject0(objectinputstream.java:1353) @ java.io.objectinputstream.readobject(objectinputstream.java:373) @ scala.collection.immutable.$colon$colon.readobject(list.scala:362) @ sun.reflect.generatedmethodaccessor3.invoke(unknown source) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:498) @ java.io.objectstreamclass.invokereadobject(objectstreamclass.java:1058) @ java.io.objectinputstream.readserialdata(objectinputstream.java:1909) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1808) @ java.io.objectinputstream.readobject0(objectinputstream.java:1353) @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:2018) @ java.io.objectinputstream.readserialdata(objectinputstream.java:1942) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1808) @ java.io.objectinputstream.readobject0(objectinputstream.java:1353) @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:2018) @ java.io.objectinputstream.readserialdata(objectinputstream.java:1942) @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1808) @ java.io.objectinputstream.readobject0(objectinputstream.java:1353) @ java.io.objectinputstream.readobject(objectinputstream.java:373) @ org.apache.spark.serializer.javadeserializationstream.readobject(javaserializer.scala:76) @ org.apache.spark.serializer.javaserializerinstance.deserialize(javaserializer.scala:115) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:61) @ org.apache.spark.scheduler.task.run(task.scala:89) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:213) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745)
java.lang.classnotfoundexception: com.lancoo.ecbdc.business.comparativeanalysisbusiness$$anonfun$1
the class loader not load above mentioned class per exception. provide more information how class used in code?
Comments
Post a Comment