首页 存档 技术 查看内容

深度剖析 Spark 分布式执行原理

2018-3-30 13:00 |来自: 互联网 449 0

摘要: 让代码分布式运行是所有分布式计算框架需要解决的最基本的问题。 Spark是大数据领域中相当火热的计算框架,在大数据分析领域有一统江湖的趋势,网上对于Spark源码分析的文章有很多,但是介绍Spark如何处理代码分布 ...

让代码分布式运行是所有分布式计算框架需要解决的最基本的问题。


Spark是大数据领域中相当火热的计算框架,在大数据分析领域有一统江湖的趋势,网上对于Spark源码分析的文章有很多,但是介绍Spark如何处理代码分布式执行问题的资料少之又少,这也是我撰写文本的目的。


Spark运行在JVM之上,任务的执行依赖序列化及类加载机制,因此本文会重点围绕这两个主题介绍Spark对代码分布式执行的处理。本文假设读者对Spark、Java、Scala有一定的了解,代码示例基于Scala,Spark源码基于2.1.0版本。阅读本文你可以了解到:

  • Java对象序列化机制

  • 类加载器的作用

  • Spark对closure序列化的处理

  • Spark Application的class是如何加载的

  • Spark REPL(spark-shell)中的代码是如何分布式执行的


根据以上内容,读者可以基于JVM相关的语言构建一个自己的分布式计算服务框架。


Java对象序列化

序列化(Serialization)是将对象的状态信息转换为可以存储或传输的形式的过程。所谓的状态信息指的是对象在内存中的数据,Java中一般指对象的字段数据。我们开发Java应用的时候或多或少都处理过对象序列化,对象常见的序列化形式有JSON、XML等。


JDK中内置一个ObjectOutputStream类可以将对象序列化为二进制数据,使用ObjectOutputStream序列化对象时,要求对象所属的类必须实现java.io.Serializable接口,否则会报java.io.NotSerializableException的异常。


基本的概念先介绍到这。接下来我们一起探讨一个问题:Java的方法能否被序列化?


假设我们有如下的**Task类(Java类):

import java.io.Serializable;

public abstract class Task implements Serializable {
    public void run() {
        System.out.println("run task!");
    }
}

public class **Task extends Task {
    @Override
    public void run() {
        System.out.println("run ** task!");
    }
}


还有一个用于将对象序列化到文件的工具类FileSerializer:

import java.io.{FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}

object FileSerializer {

  def writeObjectToFile(obj: Object, file: String) = {
    val fileStream = new FileOutputStream(file)
    val oos = new ObjectOutputStream(fileStream)
    oos.writeObject(obj)
    oos.close()
  }

  def readObjectFromFile(file: String): Object = {
    val fileStream = new FileInputStream(file)
    val ois = new ObjectInputStream(fileStream)
    val obj = ois.readObject()
    ois.close()
    obj
  }
}


简单起见,我们采用将对象序列化到文件,然后通过反序列化执行的方式来模拟代码的分布式执行。**Task就是我们需要模拟分布式执行的代码。我们先将**Task序列化到文件中:

val task = new **Task()
FileSerializer.writeObjectToFile(task, "task.ser")


然后将**Task类从我们的代码中删除,此时只有task.ser文件中含有task对象的序列化数据。接下来我们执行下面的代码:

val task = FileSerializer.readObjectFromFile("task.ser").asInstanceOf[Task]
task.run()


请各位读者思考,上面的代码执行后会出现什么样的结果?

  • 输出:run ** task! ?

  • 输出:run task! ?

  • 还是会报错?


实际执行会出现形如下面的异常:

Exception in thread "main" java.lang.ClassNotFoundException: site.stanzhai.serialization.**Task
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:628)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at site.stanzhai.serialization.FileSerializer$.readObjectFromFile(FileSerializer.scala:20)


从异常信息来看,反序列过程中找不到**Task类。由此可以推断序列化后的数据是不包含类的定义信息的。那么,ObjectOutputStream到底序列化了哪些信息呢?


对ObjectOutputStream实现机制感兴趣的同学可以去看下JDK中这个类的实现,ObjectOutputStream序列化对象时,从父类的数据开始序列化到子类,如果override了writeObject方法,会反射调用writeObject来序列化数据。序列化的数据会按照以下的顺序以二进制的形式输出到OutputStream中:

  1. 类的descriptor(仅仅是类的描述信息,不包含类的定义)

  2. 对象的primitive类型数据(int,boolean等,String和Array是特殊处理的)

  3. 对象的其他obj数据


回到我们的问题上:Java的方法能否被序列化?通过我们代码示例及分析,想必大家对这个问题应该清楚了。通过ObjectOutputStream序列化对象,仅包含类的描述(而非定义),对象的状态数据,由于缺少类的定义,也就是缺少**Task的字节码,反序列化过程中就会出现ClassNotFound的异常。


如何让我们反序列化的对象能正常使用呢?我们还需要了解类加载器。


类加载器:ClassLoader

ClassLoader在Java中是一个抽象类,ClassLoader的作用是加载类,给定一个类名,ClassLoader会尝试查找或生成类的定义,一种典型的加载策略是将类名对应到文件名上,然后从文件系统中加载class file。


在我们的示例中,反序列化**Task失败,是因为JVM找不到类的定义,因此要确保正常反序列化,我们必须将**Task的class文件保存下来,反序列化的时候能够让ClassLoader加载到**Task的class。


接下来,我们对代码做一些改造,添加一个ClassManipulator类,用于将对象的class文件导出到当前目录的文件中,默认的文件名就是对象的类名(不含包名):

object ClassManipulator {
  def saveClassFile(obj: AnyRef): Unit = {
    val classLoader = obj.getClass.getClassLoader
    val className = obj.getClass.getName
    val classFile = className.replace('.', '/')   ".class"
    val stream = classLoader.getResourceAsStream(classFile)

    // just use the class ** name as the file name
    val outputFile = className.split('.').last   ".class"
    val fileStream = new FileOutputStream(outputFile)
    var data = stream.read()
    while (data != -1) {
      fileStream.write(data)
      data = stream.read()
    }
    fileStream.flush()
    fileStream.close()
  }
}


按照JVM的规范,假设对package.**这样的一个类编译,编译后的class文件为package/**.class,因此我们可以根据路径规则,从当前JVM进程的Resource中得到指定类的class数据。


在删除**Task前,我们除了将task序列化到文件外,还需要将task的class文件保存起来,执行完下面的代码,**Task类就可以从代码中剔除了:

val task = new **Task()
FileSerializer.writeObjectToFile(task, "task.ser")
ClassManipulator.saveClassFile(task)


由于我们保存class文件的方式比较特殊,既不在jar包中,也不是按package/ClassName.class这种标准的保存方式,因此还需要实现一个自定义的FileClassLoader按照我们保存class文件的方式来加载所需的类:

class FileClassLoader() extends ClassLoader {
  override def findClass(fullClassName: String): Class[_] = {
    val file = fullClassName.split('.').last   ".class"
    val in = new FileInputStream(file)
    val bos = new ByteArrayOutputStream
    val bytes = new Array[Byte](4096)
    var done = false
    while (!done) {
      val num = in.read(bytes)
      if (num
声明:文章版权归原作者所有 部分文章转自互联网 如有侵权请联系 [邮箱地址] 删除

路过

雷人

握手

鲜花

鸡蛋

相关分类

返回顶部