让代码分布式运行是所有分布式计算框架需要解决的最基本的问题。 Spark是大数据领域中相当火热的计算框架,在大数据分析领域有一统江湖的趋势,网上对于Spark源码分析的文章有很多,但是介绍Spark如何处理代码分布式执行问题的资料少之又少,这也是我撰写文本的目的。 Spark运行在JVM之上,任务的执行依赖序列化及类加载机制,因此本文会重点围绕这两个主题介绍Spark对代码分布式执行的处理。本文假设读者对Spark、Java、Scala有一定的了解,代码示例基于Scala,Spark源码基于2.1.0版本。阅读本文你可以了解到:
根据以上内容,读者可以基于JVM相关的语言构建一个自己的分布式计算服务框架。 Java对象序列化序列化(Serialization)是将对象的状态信息转换为可以存储或传输的形式的过程。所谓的状态信息指的是对象在内存中的数据,Java中一般指对象的字段数据。我们开发Java应用的时候或多或少都处理过对象序列化,对象常见的序列化形式有JSON、XML等。 JDK中内置一个ObjectOutputStream类可以将对象序列化为二进制数据,使用ObjectOutputStream序列化对象时,要求对象所属的类必须实现java.io.Serializable接口,否则会报java.io.NotSerializableException的异常。 基本的概念先介绍到这。接下来我们一起探讨一个问题:Java的方法能否被序列化? 假设我们有如下的**Task类(Java类): import java.io.Serializable;
public abstract class Task implements Serializable {
public void run() {
System.out.println("run task!");
}
}
public class **Task extends Task {
@Override
public void run() {
System.out.println("run ** task!");
}
}
还有一个用于将对象序列化到文件的工具类FileSerializer: import java.io.{FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
object FileSerializer {
def writeObjectToFile(obj: Object, file: String) = {
val fileStream = new FileOutputStream(file)
val oos = new ObjectOutputStream(fileStream)
oos.writeObject(obj)
oos.close()
}
def readObjectFromFile(file: String): Object = {
val fileStream = new FileInputStream(file)
val ois = new ObjectInputStream(fileStream)
val obj = ois.readObject()
ois.close()
obj
}
}
简单起见,我们采用将对象序列化到文件,然后通过反序列化执行的方式来模拟代码的分布式执行。**Task就是我们需要模拟分布式执行的代码。我们先将**Task序列化到文件中: val task = new **Task() FileSerializer.writeObjectToFile(task, "task.ser") 然后将**Task类从我们的代码中删除,此时只有task.ser文件中含有task对象的序列化数据。接下来我们执行下面的代码: val task = FileSerializer.readObjectFromFile("task.ser").asInstanceOf[Task]
task.run()
请各位读者思考,上面的代码执行后会出现什么样的结果?
实际执行会出现形如下面的异常: Exception in thread "main" java.lang.ClassNotFoundException: site.stanzhai.serialization.**Task
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:628)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at site.stanzhai.serialization.FileSerializer$.readObjectFromFile(FileSerializer.scala:20)
从异常信息来看,反序列过程中找不到**Task类。由此可以推断序列化后的数据是不包含类的定义信息的。那么,ObjectOutputStream到底序列化了哪些信息呢? 对ObjectOutputStream实现机制感兴趣的同学可以去看下JDK中这个类的实现,ObjectOutputStream序列化对象时,从父类的数据开始序列化到子类,如果override了writeObject方法,会反射调用writeObject来序列化数据。序列化的数据会按照以下的顺序以二进制的形式输出到OutputStream中:
回到我们的问题上:Java的方法能否被序列化?通过我们代码示例及分析,想必大家对这个问题应该清楚了。通过ObjectOutputStream序列化对象,仅包含类的描述(而非定义),对象的状态数据,由于缺少类的定义,也就是缺少**Task的字节码,反序列化过程中就会出现ClassNotFound的异常。 如何让我们反序列化的对象能正常使用呢?我们还需要了解类加载器。 类加载器:ClassLoaderClassLoader在Java中是一个抽象类,ClassLoader的作用是加载类,给定一个类名,ClassLoader会尝试查找或生成类的定义,一种典型的加载策略是将类名对应到文件名上,然后从文件系统中加载class file。 在我们的示例中,反序列化**Task失败,是因为JVM找不到类的定义,因此要确保正常反序列化,我们必须将**Task的class文件保存下来,反序列化的时候能够让ClassLoader加载到**Task的class。 接下来,我们对代码做一些改造,添加一个ClassManipulator类,用于将对象的class文件导出到当前目录的文件中,默认的文件名就是对象的类名(不含包名): object ClassManipulator {
def saveClassFile(obj: AnyRef): Unit = {
val classLoader = obj.getClass.getClassLoader
val className = obj.getClass.getName
val classFile = className.replace('.', '/') ".class"
val stream = classLoader.getResourceAsStream(classFile)
// just use the class ** name as the file name
val outputFile = className.split('.').last ".class"
val fileStream = new FileOutputStream(outputFile)
var data = stream.read()
while (data != -1) {
fileStream.write(data)
data = stream.read()
}
fileStream.flush()
fileStream.close()
}
}
按照JVM的规范,假设对package.**这样的一个类编译,编译后的class文件为package/**.class,因此我们可以根据路径规则,从当前JVM进程的Resource中得到指定类的class数据。 在删除**Task前,我们除了将task序列化到文件外,还需要将task的class文件保存起来,执行完下面的代码,**Task类就可以从代码中剔除了: val task = new **Task() FileSerializer.writeObjectToFile(task, "task.ser") ClassManipulator.saveClassFile(task) 由于我们保存class文件的方式比较特殊,既不在jar包中,也不是按package/ClassName.class这种标准的保存方式,因此还需要实现一个自定义的FileClassLoader按照我们保存class文件的方式来加载所需的类: class FileClassLoader() extends ClassLoader {
override def findClass(fullClassName: String): Class[_] = {
val file = fullClassName.split('.').last ".class"
val in = new FileInputStream(file)
val bos = new ByteArrayOutputStream
val bytes = new Array[Byte](4096)
var done = false
while (!done) {
val num = in.read(bytes)
if (num |
|
声明:文章版权归原作者所有 部分文章转自互联网 如有侵权请联系
[邮箱地址] 删除
|