本文共 3139 字,大约阅读时间需要 10 分钟。
在编写Spark程序时,若在map、filter等算子内部引用了外部变量或函数,可能会导致Task未序列化的问题。这种情况通常发生在引用了当前类成员变量或函数时,导致整个类需要支持序列化,而某些成员变量未能做好序列化准备。
例如,在一个Spark程序中,我们定义了一个类MyTest1
,该类使用了SparkContext
和SparkConf
作为成员变量。由于这些变量未做好序列化处理,导致类整体无法序列化,最终引发Task未序列化错误。
class MyTest1(conf:String) extends Serializable { val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org") private val sparkConf = new SparkConf().setAppName("AppName") private val sc = new SparkContext(sparkConf) val rdd = sc.parallelize(list) private val rootDomain = conf def getResult(): Array[String] = { val result = rdd.filter(item => item.contains(rootDomain)) result.take(result.count().toInt) }}
在运行过程中,错误报告指出SparkContext
和SparkConf
未能序列化,导致Task失败。为了验证这一点,我们尝试对不需要序列化的成员变量进行标注。
class MyTest1(conf:String) extends Serializable { val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org") @transient private val sparkConf = new SparkConf().setAppName("AppName") @transient private val sc = new SparkContext(sparkConf) val rdd = sc.parallelize(list) private val rootDomain = conf def getResult(): Array[String] = { val result = rdd.filter(item => item.contains(rootDomain)) result.take(result.count().toInt) }}
标注后,sparkConf
和sc
不再参与序列化过程,程序能够正常运行。
同样地,当成员函数引用导致类依赖序列化时,也会引发Task未序列化问题。例如,在map操作中使用了一个成员函数来处理域名前缀。
class MyTest1(conf:String) extends Serializable { val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org") private val sparkConf = new SparkConf().setAppName("AppName") private val sc = new SparkContext(sparkConf) val rdd = sc.parallelize(list) def getResult(): Array[(String)] = { val rootDomain = conf val result = rdd.filter(item => item.contains(rootDomain)) .map(item => addWWW(item)) result.take(result.count().toInt) } def addWWW(str:String): String = { if(str.startsWith("www.")) str else "www."+str }}
为了解决这个问题,我们可以将成员函数移至一个object
中,使其不再依赖当前类。
def getResult(): Array[(String)] = { val rootDomain = conf val result = rdd.filter(item => item.contains(rootDomain)) .map(item => UtilTool.addWWW(item)) result.take(result.count().toInt) }object UtilTool { def addWWW(str:String): String = { if(str.startsWith("www.")) str else "www."+str }}
通过上述实例可以看出,引用成员变量或函数会导致整个类需要支持序列化。因此,在类中使用@transient
标注来避免不必要的序列化。
class MyTest1(conf:String) { val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org") private val sparkConf = new SparkConf().setAppName("AppName") private val sc = new SparkContext(sparkConf) val rdd = sc.parallelize(list) def getResult(): Array[String] = { val rootDomain = conf val result = rdd.filter(item => item.contains(rootDomain)) result.take(result.count().toInt) }}
如果去掉extends Serializable
,程序将报错,证明了类需要序列化的必要性。
避免在闭包内部引用成员变量或函数
object
中的静态变量。确保类支持序列化并处理不可序列化成员
Serializable
,并对不可序列化的成员使用@transient
标注。object
中,以减少对类序列化的依赖。通过以上方法,可以有效避免Task未序列化问题,确保Spark程序顺利运行。
转载地址:http://bjrfk.baihongyu.com/