这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
11 changes: 10 additions & 1 deletion Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ let package = Package(
.package(url: "https://github.com/orchetect/SwiftRadix", from: "1.3.1"),
.package(url: "https://github.com/groue/Semaphore", from: "0.0.8"),
.package(url: "https://github.com/fumoboy007/swift-retry", from: "0.2.3"),
.package(url: "https://github.com/jozefizso/swift-xattr", from: "3.0.0"),
],
targets: [
.executableTarget(name: "tart", dependencies: [
Expand All @@ -40,6 +41,7 @@ let package = Package(
.product(name: "SwiftRadix", package: "SwiftRadix"),
.product(name: "Semaphore", package: "Semaphore"),
.product(name: "DMRetry", package: "swift-retry"),
.product(name: "XAttr", package: "swift-xattr"),
], exclude: [
"OCI/Reference/Makefile",
"OCI/Reference/Reference.g4",
Expand Down
5 changes: 3 additions & 2 deletions Sources/tart/Commands/List.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ fileprivate struct VMInfo: Encodable {
let Name: String
let Disk: Int
let Size: Int
let SizeOnDisk: Int
let Running: Bool
let State: String
}
Expand Down Expand Up @@ -38,13 +39,13 @@ struct List: AsyncParsableCommand {

if source == nil || source == "local" {
infos += sortedInfos(try VMStorageLocal().list().map { (name, vmDir) in
try VMInfo(Source: "local", Name: name, Disk: vmDir.sizeGB(), Size: vmDir.allocatedSizeGB(), Running: vmDir.running(), State: vmDir.state().rawValue)
try VMInfo(Source: "local", Name: name, Disk: vmDir.sizeGB(), Size: vmDir.allocatedSizeGB(), SizeOnDisk: vmDir.allocatedSizeGB() - vmDir.deduplicatedSizeGB(), Running: vmDir.running(), State: vmDir.state().rawValue)
})
}

if source == nil || source == "oci" {
infos += sortedInfos(try VMStorageOCI().list().map { (name, vmDir, _) in
try VMInfo(Source: "OCI", Name: name, Disk: vmDir.sizeGB(), Size: vmDir.allocatedSizeGB(), Running: vmDir.running(), State: vmDir.state().rawValue)
try VMInfo(Source: "OCI", Name: name, Disk: vmDir.sizeGB(), Size: vmDir.allocatedSizeGB(), SizeOnDisk: vmDir.allocatedSizeGB() - vmDir.deduplicatedSizeGB(), Running: vmDir.running(), State: vmDir.state().rawValue)
})
}

Expand Down
10 changes: 9 additions & 1 deletion Sources/tart/LocalLayerCache.swift
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import Foundation

struct LocalLayerCache {
let name: String
let deduplicatedBytes: UInt64
let diskURL: URL

private let mappedDisk: Data
private var digestToRange: [String : Range<Data.Index>] = [:]

init?(_ diskURL: URL, _ manifest: OCIManifest) throws {
init?(_ name: String, _ deduplicatedBytes: UInt64, _ diskURL: URL, _ manifest: OCIManifest) throws {
self.name = name
self.deduplicatedBytes = deduplicatedBytes
self.diskURL = diskURL

// mmap(2) the disk that contains the layers from the manifest
self.mappedDisk = try Data(contentsOf: diskURL, options: [.alwaysMapped])

Expand Down
72 changes: 65 additions & 7 deletions Sources/tart/OCI/Layerizer/DiskV2.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Foundation
import Compression
import System

class DiskV2: Disk {
private static let bufferSizeBytes = 4 * 1024 * 1024
Expand Down Expand Up @@ -37,8 +38,17 @@ class DiskV2: Disk {
// Support resumable pulls
let pullResumed = FileManager.default.fileExists(atPath: diskURL.path)

if !pullResumed && !FileManager.default.createFile(atPath: diskURL.path, contents: nil) {
throw OCIError.FailedToCreateVmFile
if !pullResumed {
if let localLayerCache = localLayerCache {
// Clone the local layer cache's disk and use it as a base, potentially
// reducing the space usage since some blocks won't be written at all
try FileManager.default.copyItem(at: localLayerCache.diskURL, to: diskURL)
} else {
// Otherwise create an empty disk
if !FileManager.default.createFile(atPath: diskURL.path, contents: nil) {
throw OCIError.FailedToCreateVmFile
}
}
}

// Calculate the uncompressed disk size
Expand All @@ -58,6 +68,15 @@ class DiskV2: Disk {
try disk.truncate(atOffset: uncompressedDiskSize)
try disk.close()

// Determine the file system block size
var st = stat()
if stat(diskURL.path, &st) == -1 {
let details = Errno(rawValue: errno)

throw RuntimeError.PullFailed("failed to stat(2) disk \(diskURL.path): \(details)")
}
let fsBlockSize = UInt64(st.st_blksize)

// Concurrently fetch and decompress layers
try await withThrowingTaskGroup(of: Void.self) { group in
var globalDiskWritingOffset: UInt64 = 0
Expand Down Expand Up @@ -89,13 +108,21 @@ class DiskV2: Disk {
return
}

// Open the disk file
// Open the disk file for writing
let disk = try FileHandle(forWritingTo: diskURL)

// Also open the disk file for reading and verifying
// its contents in case the local layer cache is used
let rdisk: FileHandle? = if localLayerCache != nil {
try FileHandle(forReadingFrom: diskURL)
} else {
nil
}

// Check if we already have this layer contents in the local layer cache
if let localLayerCache = localLayerCache, let data = localLayerCache.find(diskLayer.digest), Digest.hash(data) == uncompressedLayerContentDigest {
// Fulfil the layer contents from the local blob cache
_ = try zeroSkippingWrite(disk, diskWritingOffset, data)
_ = try zeroSkippingWrite(disk, rdisk, fsBlockSize, diskWritingOffset, data)
try disk.close()

// Update the progress
Expand All @@ -112,7 +139,7 @@ class DiskV2: Disk {
return
}

diskWritingOffset = try zeroSkippingWrite(disk, diskWritingOffset, data)
diskWritingOffset = try zeroSkippingWrite(disk, rdisk, fsBlockSize, diskWritingOffset, data)
}

try await registry.pullBlob(diskLayer.digest) { data in
Expand All @@ -132,7 +159,7 @@ class DiskV2: Disk {
}
}

private static func zeroSkippingWrite(_ disk: FileHandle, _ offset: UInt64, _ data: Data) throws -> UInt64 {
private static func zeroSkippingWrite(_ disk: FileHandle, _ rdisk: FileHandle?, _ fsBlockSize: UInt64, _ offset: UInt64, _ data: Data) throws -> UInt64 {
let holeGranularityBytes = 64 * 1024

// A zero chunk for faster than byte-by-byte comparisons
Expand All @@ -152,7 +179,38 @@ class DiskV2: Disk {
var offset = offset

for chunk in data.chunks(ofCount: holeGranularityBytes) {
// Only write chunks that are not zero
// If the local layer cache is used, only write chunks that differ
// since the base disk can contain anything at any position
if let rdisk = rdisk {
// F_PUNCHHOLE requires the holes to be aligned to file system block boundaries
let isHoleAligned = (offset % fsBlockSize) == 0 && (UInt64(chunk.count) % fsBlockSize) == 0

if isHoleAligned && chunk == zeroChunk {
var arg = fpunchhole_t(fp_flags: 0, reserved: 0, fp_offset: off_t(offset), fp_length: off_t(chunk.count))

if fcntl(disk.fileDescriptor, F_PUNCHHOLE, &arg) == -1 {
let details = Errno(rawValue: errno)

throw RuntimeError.PullFailed("failed to punch hole: \(details)")
}
} else {
try rdisk.seek(toOffset: offset)
let actualContentsOnDisk = try rdisk.read(upToCount: chunk.count)

if chunk != actualContentsOnDisk {
try disk.seek(toOffset: offset)
disk.write(chunk)
}
}

offset += UInt64(chunk.count)

continue
}

// Otherwise, only write chunks that are not zero
// since the base disk is created from scratch and
// is zeroed via truncate(2)
if chunk != zeroChunk {
try disk.seek(toOffset: offset)
disk.write(chunk)
Expand Down
26 changes: 26 additions & 0 deletions Sources/tart/URL+Prunable.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Foundation
import XAttr

extension URL: Prunable {
var url: URL {
Expand All @@ -13,7 +14,32 @@ extension URL: Prunable {
try resourceValues(forKeys: [.totalFileAllocatedSizeKey]).totalFileAllocatedSize!
}

func deduplicatedSizeBytes() throws -> Int {
let values = try resourceValues(forKeys: [.totalFileAllocatedSizeKey, .mayShareFileContentKey])
// make sure the file's origin file is there and duplication works
var dedublicatedSize = 0
if values.mayShareFileContent == true {
return Int(deduplicatedBytes())
}
return 0
}

func sizeBytes() throws -> Int {
try resourceValues(forKeys: [.totalFileSizeKey]).totalFileSize!
}

func setDeduplicatedBytes(_ size: UInt64) {
let data = "\(size)".data(using: .utf8)!
try! self.setExtendedAttribute(name: "run.tart.deduplicated-bytes", value: data)
}

func deduplicatedBytes() -> UInt64 {
guard let data = try? self.extendedAttributeValue(forName: "run.tart.deduplicated-bytes") else {
return 0
}
if let strValue = String(data: data, encoding: .utf8) {
return UInt64(strValue) ?? 0
}
return 0
}
}
10 changes: 6 additions & 4 deletions Sources/tart/VMDirectory+OCI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ enum OCIError: Error {
}

extension VMDirectory {
private static let bufferSizeBytes = 64 * 1024 * 1024
private static let layerLimitBytes = 500 * 1000 * 1000

func pullFromRegistry(registry: Registry, manifest: OCIManifest, concurrency: UInt, localLayerCache: LocalLayerCache?) async throws {
// Pull VM's config file layer and re-serialize it into a config file
let configLayers = manifest.layers.filter {
Expand Down Expand Up @@ -62,6 +59,11 @@ extension VMDirectory {
throw RuntimeError.PullFailed("failed to decompress disk: \(error.localizedDescription)")
}

if let llc = localLayerCache {
// set custom attribute to remember deduplicated bytes
diskURL.setDeduplicatedBytes(llc.deduplicatedBytes)
}

// Pull VM's NVRAM file layer and store it in an NVRAM file
defaultLogger.appendNewLine("pulling NVRAM...")

Expand All @@ -80,7 +82,7 @@ extension VMDirectory {
}
try nvram.close()

// Serialize VM's manifest to enable better de-duplication on subsequent "tart pull"'s
// Serialize VM's manifest to enable better deduplication on subsequent "tart pull"'s
try manifest.toJSON().write(to: manifestURL)
}

Expand Down
8 changes: 8 additions & 0 deletions Sources/tart/VMDirectory.swift
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ struct VMDirectory: Prunable {
try allocatedSizeBytes() / 1000 / 1000 / 1000
}

func deduplicatedSizeBytes() throws -> Int {
try configURL.deduplicatedSizeBytes() + diskURL.deduplicatedSizeBytes() + nvramURL.deduplicatedSizeBytes()
}

func deduplicatedSizeGB() throws -> Int {
try deduplicatedSizeBytes() / 1000 / 1000 / 1000
}

func sizeBytes() throws -> Int {
try configURL.sizeBytes() + diskURL.sizeBytes() + nvramURL.sizeBytes()
}
Expand Down
26 changes: 19 additions & 7 deletions Sources/tart/VMStorageOCI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,18 @@ class VMStorageOCI: PrunableStorage {
// Choose the best base image which has the most deduplication ratio
let localLayerCache = try await chooseLocalLayerCache(name, manifest, registry)

if let llc = localLayerCache {
let deduplicatedHuman = ByteCountFormatter.string(fromByteCount: Int64(llc.deduplicatedBytes), countStyle: .file)

defaultLogger.appendNewLine("found an image \(llc.name) that will allow us to deduplicate \(deduplicatedHuman), using it as a base...")
}

try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache)
} recoverFromFailure: { error in
if error is RuntimeError {
return .throw
}

print("Error: \(error.localizedDescription)")
print("Attempting to re-try...")

Expand Down Expand Up @@ -246,15 +256,15 @@ class VMStorageOCI: PrunableStorage {

func chooseLocalLayerCache(_ name: RemoteName, _ manifest: OCIManifest, _ registry: Registry) async throws -> LocalLayerCache? {
// Establish a closure that will calculate how much bytes
// we'll de-duplicate if we re-use the given manifest
// we'll deduplicate if we re-use the given manifest
let target = Swift.Set(manifest.layers)

let calculateDeduplicatedBytes = { (manifest: OCIManifest) -> Int in
target.intersection(manifest.layers).map({ $0.size }).reduce(0, +)
let calculateDeduplicatedBytes = { (manifest: OCIManifest) -> UInt64 in
target.intersection(manifest.layers).map({ UInt64($0.size) }).reduce(0, +)
}

// Load OCI VM images and their manifests (if present)
var candidates: [(name: String, vmDir: VMDirectory, manifest: OCIManifest, deduplicatedBytes: Int)] = []
var candidates: [(name: String, vmDir: VMDirectory, manifest: OCIManifest, deduplicatedBytes: UInt64)] = []

for (name, vmDir, isSymlink) in try list() {
if isSymlink {
Expand Down Expand Up @@ -285,13 +295,15 @@ class VMStorageOCI: PrunableStorage {
candidates.append((name.description, vmDir, manifest, calculateDeduplicatedBytes(manifest)))
}

// Now, find the best match based on how many bytes we'll de-duplicate
let choosen = candidates.max { left, right in
// Now, find the best match based on how many bytes we'll deduplicate
let choosen = candidates.filter {
$0.deduplicatedBytes > 0
}.max { left, right in
return left.deduplicatedBytes < right.deduplicatedBytes
}

return try choosen.flatMap({ choosen in
try LocalLayerCache(choosen.vmDir.diskURL, choosen.manifest)
try LocalLayerCache(choosen.name, choosen.deduplicatedBytes, choosen.vmDir.diskURL, choosen.manifest)
})
}
}
Expand Down