这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Sources/tart/Commands/Push.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ struct Push: AsyncParsableCommand {
@Flag(help: "connect to the OCI registry via insecure HTTP protocol")
var insecure: Bool = false

@Option(help: "network concurrency to use when pushing a local VM to the OCI-compatible registry")
var concurrency: UInt = 4

@Option(help: ArgumentHelp("chunk size in MB if registry supports chunked uploads",
discussion: """
By default monolithic method is used for uploading blobs to the registry but some registries support a more efficient chunked method.
Expand Down Expand Up @@ -77,7 +80,8 @@ struct Push: AsyncParsableCommand {
registry: registry,
references: references,
chunkSizeMb: chunkSize,
diskFormat: diskFormat
diskFormat: diskFormat,
concurrency: concurrency
)
// Populate the local cache (if requested)
if populateCache {
Expand Down
2 changes: 1 addition & 1 deletion Sources/tart/OCI/Layerizer/Disk.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Foundation

protocol Disk {
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer]
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer]
static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache?) async throws
}
2 changes: 1 addition & 1 deletion Sources/tart/OCI/Layerizer/DiskV1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class DiskV1: Disk {
private static let bufferSizeBytes = 4 * 1024 * 1024
private static let layerLimitBytes = 500 * 1000 * 1000

static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] {
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer] {
var pushedLayers: [OCIManifestLayer] = []

// Open the disk file
Expand Down
50 changes: 35 additions & 15 deletions Sources/tart/OCI/Layerizer/DiskV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,52 @@ class DiskV2: Disk {
private static let bufferSizeBytes = 4 * 1024 * 1024
private static let layerLimitBytes = 512 * 1024 * 1024

static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] {
var pushedLayers: [OCIManifestLayer] = []
static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer] {
var pushedLayers: [(index: Int, pushedLayer: OCIManifestLayer)] = []

// Open the disk file
let mappedDisk = try Data(contentsOf: diskURL, options: [.alwaysMapped])

// Compress the disk file as multiple individually decompressible streams,
// each equal ``Self.layerLimitBytes`` bytes or less due to LZ4 compression
for data in mappedDisk.chunks(ofCount: layerLimitBytes) {
let compressedData = try (data as NSData).compressed(using: .lz4) as Data
try await withThrowingTaskGroup(of: (Int, OCIManifestLayer).self) { group in
for (index, data) in mappedDisk.chunks(ofCount: layerLimitBytes).enumerated() {
// Respect the concurrency limit
if index >= concurrency {
if let (index, pushedLayer) = try await group.next() {
pushedLayers.append((index, pushedLayer))
}
}

// Launch a disk layer pushing task
group.addTask {
let compressedData = try (data as NSData).compressed(using: .lz4) as Data

let layerDigest = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb)
let layerDigest = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb)

pushedLayers.append(OCIManifestLayer(
mediaType: diskV2MediaType,
size: compressedData.count,
digest: layerDigest,
uncompressedSize: UInt64(data.count),
uncompressedContentDigest: Digest.hash(data)
))
// Update progress using a relative value
progress.completedUnitCount += Int64(data.count)

// Update progress using a relative value
progress.completedUnitCount += Int64(data.count)
return (index, OCIManifestLayer(
mediaType: diskV2MediaType,
size: compressedData.count,
digest: layerDigest,
uncompressedSize: UInt64(data.count),
uncompressedContentDigest: Digest.hash(data)
))
}
}

for try await pushedLayer in group {
pushedLayers.append(pushedLayer)
}
}

return pushedLayers
return pushedLayers.sorted {
$0.index < $1.index
}.map {
$0.pushedLayer
}
}

static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil) async throws {
Expand Down
6 changes: 3 additions & 3 deletions Sources/tart/VMDirectory+OCI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ extension VMDirectory {
try manifest.toJSON().write(to: manifestURL)
}

func pushToRegistry(registry: Registry, references: [String], chunkSizeMb: Int, diskFormat: String) async throws -> RemoteName {
func pushToRegistry(registry: Registry, references: [String], chunkSizeMb: Int, diskFormat: String, concurrency: UInt) async throws -> RemoteName {
var layers = Array<OCIManifestLayer>()

// Read VM's config and push it as blob
Expand All @@ -103,9 +103,9 @@ extension VMDirectory {

switch diskFormat {
case "v1":
layers.append(contentsOf: try await DiskV1.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, progress: progress))
layers.append(contentsOf: try await DiskV1.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, concurrency: concurrency, progress: progress))
case "v2":
layers.append(contentsOf: try await DiskV2.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, progress: progress))
layers.append(contentsOf: try await DiskV2.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, concurrency: concurrency, progress: progress))
default:
throw RuntimeError.OCIUnsupportedDiskFormat(diskFormat)
}
Expand Down
4 changes: 2 additions & 2 deletions Tests/TartTests/LayerizerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ final class LayerizerTests: XCTestCase {
let pulledDiskFileURL = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString)

print("pushing disk...")
let diskLayers = try await DiskV1.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, progress: Progress())
let diskLayers = try await DiskV1.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, concurrency: 4, progress: Progress())

print("pulling disk...")
try await DiskV1.pull(registry: registry, diskLayers: diskLayers, diskURL: pulledDiskFileURL, concurrency: 16, progress: Progress())
Expand All @@ -57,7 +57,7 @@ final class LayerizerTests: XCTestCase {
let pulledDiskFileURL = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString)

print("pushing disk...")
let diskLayers = try await DiskV2.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, progress: Progress())
let diskLayers = try await DiskV2.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, concurrency: 4, progress: Progress())

print("pulling disk...")
try await DiskV2.pull(registry: registry, diskLayers: diskLayers, diskURL: pulledDiskFileURL, concurrency: 16, progress: Progress())
Expand Down