Spark存储机制分析

Background

Apache Spark是最近非常火热的一个分布式计算框架,计算速度远快于Hadoop。Spark计算性能优秀的一个秘密在于它会将相当的计算结果缓存在内存中而不是直接写入硬盘,而它的RDD结构设计又决定了它会大量利用到缓存的数据。今天我就打算对Spark的存储部分进行分析,分析的主要着重点在于它的缓存策略,多机之间的通信部分不会过多涉及。

相关类概述

RDD是用户实际操作的类,也是缓存机制的入口,CacheManager类是RDD和实际查询之间的中间层,一方面将RDD的信息传递给BlockingManager,一方面保证每个节点不会重复读取RDD,提供并发控制。BlockingManager类提供了实际的查询接口,通过MemoryStore、DiskStore和TachyonStore三个类管理具体的缓存位置。

存储机制具体过程

1.RDD.iterator方法是缓存读取机制的入口

1
2
3
4
5
6
7
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}

从代码中可见如果存储级别不是NONE,就会以Partition为分片进行缓存查询,否则就用computeOrReadCheckpoint重新计算。
2.用CacheManager类的getOrCompute接口调用BlockManager类的get方法来获取数据。在getOrCompute函数中顶层抽象中的Partition与底层的Block形成了联系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {

val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
}

// Otherwise, we have to load the partition ourselves
try {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)

// If the task is running locally, do not persist the result
if (context.runningLocally) {
return computedValues
}

// Otherwise, cache the values and keep track of any updates in block statuses
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
val metrics = context.taskMetrics
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
new InterruptibleIterator(context, cachedValues)

} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
}
}

首先会使用RDDBlockId函数将要查询的Patition转化成BlockId,然后调用BlockManager类的get方法进行查询。如果查询到那么会把查询结果以task为单位储存起来。从上面两个函数可以看出来即使储存级别不是NONE也有可能无法从缓存中查询到。在查询过程中会出现并发的情况,所以要加锁。缓存不命中时最后还是会调用RDD的computeOrReadCheckpoint进行计算。如果task运行在本地就可以直接返回计算结果,否则会调用putInBlockManager上传缓存,同时还会跟踪缓存的status来保证缓存的一致性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
private def putInBlockManager[T](
key: BlockId,
values: Iterator[T],
level: StorageLevel,
updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {

val putLevel = effectiveStorageLevel.getOrElse(level)
if (!putLevel.useMemory) {
/*
* 如果存储级别不是在内存里,那么可以直接将计算结果以iterator的形式传给BlockManager
* 调用其putIterator方法进行储存,否则要先在MemoryStore类中注册。
* 储存结束后还要查询一下保证缓存成功。
*/
updatedBlocks ++=
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else {
/*
* 如果缓存在内存中就不能直接传递iterator,而是调用putArray方法将整个数组储存起来。
* 因为日后这个partition有可能会在被再次查询前被从内存中删除掉,这样迭代器就会失效。
* 另外要先在内存中注册,因为有可能出现内存空间不够的情况。出现时会选择一个合适的partition
* 清理到硬盘里。选择过程由MemoryStore.unrollSafely进行。
*/
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
updatedBlocks ++=
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
arr.iterator.asInstanceOf[Iterator[T]]
case Right(it) =>
// There is not enough space to cache this partition in memory
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
useOffHeap = false, deserialized = false, putLevel.replication)
putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
} else {
returnValues
}
}
}
}

3.BlockManager类使用getLocal和getRemote方法从本地或远程查询数据。
getLocal函数实际又是doGetLocal的包装。doGetLocal会先通过blockdId获得blockinfo,然后取出此block的存储级别,进而进入不同分支,memory、tachyon或disk。前两者本质都是在内存中储存的。而disk分支在查询到结果后还会进行判断,判断这个block是否本来储存级别是memory(后来被清理出去了),如果是就将这个block载入内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
val info = blockInfo.get(blockId).orNull
if (info != null) {
info.synchronized {
if (blockInfo.get(blockId).isEmpty) {
logWarning(s"Block $blockId had been removed")
return None
}
if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure.")
return None
}

val level = info.level
logDebug(s"Level for block $blockId is $level")

if (level.useMemory) {
logDebug(s"Getting block $blockId from memory")
val result = if (asBlockResult) {
memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
memoryStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in memory")
}
}

if (level.useOffHeap) {
logDebug(s"Getting block $blockId from tachyon")
if (tachyonStore.contains(blockId)) {
tachyonStore.getBytes(blockId) match {
case Some(bytes) =>
if (!asBlockResult) {
return Some(bytes)
} else {
return Some(new BlockResult(
dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
}
case None =>
logDebug(s"Block $blockId not found in tachyon")
}
}
}
if (level.useDisk) {
logDebug(s"Getting block $blockId from disk")
val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
case Some(b) => b
case None =>
throw new BlockException(
blockId, s"Block $blockId not found on disk, though it should be")
}
assert(0 == bytes.position())

if (!level.useMemory) {
// If the block shouldn't be stored in memory, we can just return it
if (asBlockResult) {
return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
info.size))
} else {
return Some(bytes)
}
} else {
if (!level.deserialized || !asBlockResult) {
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
memoryStore.putBytes(blockId, copyForMemory, level)
bytes.rewind()
}
if (!asBlockResult) {
return Some(bytes)
} else {
val values = dataDeserialize(blockId, bytes)
if (level.deserialized) {
// Cache the values before returning them
val putResult = memoryStore.putIterator(
blockId, values, level, returnValues = true, allowPersistToDisk = false)
putResult.data match {
case Left(it) =>
return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
case _ =>
throw new SparkException("Memory store did not return an iterator!")
}
} else {
return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
}
}
}
}
}
} else {
logDebug(s"Block $blockId not registered locally")
}
None
}

查询过程中BlockingManager不会直接调用底层的查询函数,而是通过MemoryStore、DiskStore和TachyonStore三个管理类代理。
getRemote函数实际也是doGetRemote的包装。doGetRemote的过程比较简单,就是先获得blockinfo,然后查询自己在集群中的locations,最后持续依照locations将blockinfo发送给远端,等待任一个远端返回数据之后查询结束。

接下来看一下put相关函数。在2中我们发现向BlockingManager提交存储有两个接口,putArray和putIterator.看过源码之后可以发现这两个函数都是doPut函数的简单封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
private def doPut(
blockId: BlockId,
data: BlockValues,
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None)
: Seq[(BlockId, BlockStatus)] = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
effectiveStorageLevel.foreach { level =>
require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
}

val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

/* Remember the block's storage level so that we can correctly drop it to disk if it needs
* to be dropped right after it got put into memory. Note, however, that other threads will
* not be able to get() this block until we call markReady on its BlockInfo. */

val putBlockInfo = {
val tinfo = new BlockInfo(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
return updatedBlocks
}
// TODO: So the block info exists - but previous attempt to load it (?) failed.
// What do we do now ? Retry on it ?
oldBlockOpt.get
} else {
tinfo
}
}

val startTimeMs = System.currentTimeMillis

/* If we're storing values and we need to replicate the data, we'll want access to the values,
* but because our put will read the whole iterator, there will be no values left. For the
* case where the put serializes data, we'll remember the bytes, above; but for the case where
* it doesn't, such as deserialized storage, let's rely on the put returning an Iterator. */

var valuesAfterPut: Iterator[Any] = null

// Ditto for the bytes after the put
var bytesAfterPut: ByteBuffer = null

// Size of the block in bytes
var size = 0L

// The level we actually use to put the block
val putLevel = effectiveStorageLevel.getOrElse(level)

// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = data match {
case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
Future { replicate(blockId, bufferView, putLevel) }
case _ => null
}

putBlockInfo.synchronized {
logTrace("Put for block %s took %s to get into synchronized block"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))

var marked = false
try {
// returnValues - Whether to return the values put
// blockStore - The type of storage to put these values into
val (returnValues, blockStore: BlockStore) = {
if (putLevel.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
} else if (putLevel.useOffHeap) {
// Use tachyon for off-heap storage
(false, tachyonStore)
} else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
(putLevel.replication > 1, diskStore)
} else {
assert(putLevel == StorageLevel.NONE)
throw new BlockException(
blockId, s"Attempted to put block $blockId without specifying storage level!")
}
}

// Actually put the values
val result = data match {
case IteratorValues(iterator) =>
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
case ArrayValues(array) =>
blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
}
size = result.size
result.data match {
case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
case Right (newBytes) => bytesAfterPut = newBytes
case _ =>
}

// Keep track of which blocks are dropped from memory
if (putLevel.useMemory) {
result.droppedBlocks.foreach { updatedBlocks += _ }
}

val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
if (putBlockStatus.storageLevel != StorageLevel.NONE) {
// Now that the block is in either the memory, tachyon, or disk store,
// let other threads read it, and tell the master about it.
marked = true
putBlockInfo.markReady(size)
if (tellMaster) {
reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
updatedBlocks += ((blockId, putBlockStatus))
}
} finally {
// If we failed in putting the block to memory/disk, notify other possible readers
// that it has failed, and then remove it from the block info map.
if (!marked) {
// Note that the remove must happen before markFailure otherwise another thread
// could've inserted a new BlockInfo before we remove it.
blockInfo.remove(blockId)
putBlockInfo.markFailure()
logWarning(s"Putting block $blockId failed")
}
}
}
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))

// Either we're storing bytes and we asynchronously started replication, or we're storing
// values and need to serialize and replicate them now:
if (putLevel.replication > 1) {
data match {
case ByteBufferValues(bytes) =>
if (replicationFuture != null) {
Await.ready(replicationFuture, Duration.Inf)
}
case _ =>
val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
throw new SparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, putLevel)
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}

BlockManager.dispose(bytesAfterPut)

if (putLevel.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
} else {
logDebug("Putting block %s without replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}

updatedBlocks
}

从代码中可以看出首先为block创建BlockInfo结构体存储block相关信息,同时将其加锁使其不能被访问。
然后根据block的replication数决定是否将该block replicate到远端。
最后根据block的storage level将block存储到memory或是disk上,同时解锁标识该block已经ready,可被访问。

5.最后我们看看那三个管理类是如何工作的。其中TachyonStore由于涉及到了hdfs的相关实现,暂时先不解析了。首先我们看一下DiskStore的实现。DiskStore仍然没有直接调用底层的操作,而是用DiskManager来管理(好多抽象层).不过DiskManager实现不复杂,只是通过一个数组以哈希表的形式保存了文件的路径。查找文件路径是通过getFile完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
val dirId = hash % localDirs.length
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

// Create the subdirectory if it doesn't already exist
var subDir = subDirs(dirId)(subDirId)
if (subDir == null) {
subDir = subDirs(dirId).synchronized {
val old = subDirs(dirId)(subDirId)
if (old != null) {
old
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
newDir.mkdir()
subDirs(dirId)(subDirId) = newDir
newDir
}
}
}
new File(subDir, filename)
}

根据block id计算出hash值,将hash取模获得dirId和subDirId,在subDirs中找出相应的subDir,若没有则新建一个subDir,最后以subDir为路径、block id为文件名创建file handler,DiskManager使用此file handler将block写入文件或读出block.

最后我们看一下MemoryStore的实现。
相比于DiskManager的实现,MemoryStore要简单很多,没有诸如创建文件、文件夹,文件读取这类比较“脏”的操作(我觉得这也是没有MemoryManager的原因),只维护了一个LinkedHashMap[BlockId, MemoryEntry],把blockId映射到内存的入口地址。获取block方面非常简单,基本就是直接从哈希表里取出地址然后直接读取。存放方面有putBytes、putArray等多个方法,但看一下源码能发现其实都是tryToPut方法的封装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* Try to put in a set of values, if we can free up enough space. The value should either be
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
* must also be passed by the caller.
*
* Synchronize on `accountingLock` to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
* another block.
*
* Return whether put was successful, along with the blocks dropped in the process.
*/
private def tryToPut(
blockId: BlockId,
value: Any,
size: Long,
deserialized: Boolean): ResultWithDroppedBlocks = {

/* TODO: Its possible to optimize the locking by locking entries only when selecting blocks
* to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has
* been released, it must be ensured that those to-be-dropped blocks are not double counted
* for freeing up more space for another block that needs to be put. Only then the actually
* dropping of blocks (and writing to disk if necessary) can proceed in parallel. */

var putSuccess = false
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

accountingLock.synchronized {
val freeSpaceResult = ensureFreeSpace(blockId, size)
val enoughFreeSpace = freeSpaceResult.success
droppedBlocks ++= freeSpaceResult.droppedBlocks

if (enoughFreeSpace) {
val entry = new MemoryEntry(value, size, deserialized)
entries.synchronized {
entries.put(blockId, entry)
currentMemory += size
}
val valuesOrBytes = if (deserialized) "values" else "bytes"
logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(
blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
putSuccess = true
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
Left(value.asInstanceOf[Array[Any]])
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
}
ResultWithDroppedBlocks(putSuccess, droppedBlocks)
}

可见tryToPut方法首先调用了ensureFreeSpace方法,来尽量确保空余出足够的空间,而后函数分为两支,分别是在不交换空间的情况下内存够与不够。如果内存足够
,那么直接将数据写入内存中,然后将入口加入entries哈希表。如果不够,就会选择将这个block直接写到硬盘中。那么,被交换的块是如何选择的呢?什么情况下会导致内存不够呢?我们来看一看ensureFreeSpace方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* Try to free up a given amount of space to store a particular block, but can fail if
* either the block is bigger than our memory or it would require replacing another block
* from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
* Assume that `accountingLock` is held by the caller to ensure only one thread is dropping
* blocks. Otherwise, the freed space may fill up before the caller puts in their new value.
*
* Return whether there is enough free space, along with the blocks dropped in the process.
*/
private def ensureFreeSpace(
blockIdToAdd: BlockId,
space: Long): ResultWithDroppedBlocks = {
logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory")

val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

if (space > maxMemory) {
logInfo(s"Will not store $blockIdToAdd as it is larger than our memory limit")
return ResultWithDroppedBlocks(success = false, droppedBlocks)
}

// Take into account the amount of memory currently occupied by unrolling blocks
val actualFreeMemory = freeMemory - currentUnrollMemory

if (actualFreeMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[BlockId]
var selectedMemory = 0L

// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (actualFreeMemory + selectedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
selectedBlocks += blockId
selectedMemory += pair.getValue.size
}
}
}

if (actualFreeMemory + selectedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one thread should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
val data = if (entry.deserialized) {
Left(entry.value.asInstanceOf[Array[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
}
return ResultWithDroppedBlocks(success = true, droppedBlocks)
} else {
logInfo(s"Will not store $blockIdToAdd as it would require dropping another block " +
"from the same RDD")
return ResultWithDroppedBlocks(success = false, droppedBlocks)
}
}
ResultWithDroppedBlocks(success = true, droppedBlocks)
}

从代码中我们可以看到,首先ensureFreeSpace方法会维护一个selectedBlocks数组,这个数组中保存的是可以被交换走的block,而selectedMemory也就是最大能够空出的空间。而selectedBlocks数组的产生过程是直接按顺序遍历entries哈希表,将不属于当前待加入RDD的block统统加进去。也就是在尽量保证当前RDD完全缓存到内存中的前提下,使用了FIFO策略进行淘汰。产生selectedBlocks之后,会先判断一下如果全部释放空间够不够,如果不够就会直接返回。否则,会依次将里面的block交换出内存,直到产生的空余空间足够。

缓存策略

从我们对源码的分析可以看出,当用户调用RDD.iterator时会自动触发缓存机制,将这个RDD缓存起来,同时默认的缓存级别是内存。而读取缓存也是完全自动的,不需要用户进行管理。当内存存满之后会在尽量保证当前RDD完整的情况下,采用FIFO策略选取一部分block交换到disk中产生空余空间,而如果硬盘中的block被再次用到并且缓存级别是内存,就会自动重新载入内存中。虽然FIFO比LRU算法简陋,但在Spark的应用场景中LRU算法反而发挥出来优势,而且FIFO算法实现起来原为简单。

参考资料:

  1. Apache Spark源码走读之6 — 存储子系统分析
  2. Spark源码分析之-Storage模块