diff --git a/Sources/tart/Commands/Push.swift b/Sources/tart/Commands/Push.swift index da31dee8..ff4bf9e9 100644 --- a/Sources/tart/Commands/Push.swift +++ b/Sources/tart/Commands/Push.swift @@ -15,6 +15,9 @@ struct Push: AsyncParsableCommand { @Flag(help: "connect to the OCI registry via insecure HTTP protocol") var insecure: Bool = false + @Option(help: "network concurrency to use when pushing a local VM to the OCI-compatible registry") + var concurrency: UInt = 4 + @Option(help: ArgumentHelp("chunk size in MB if registry supports chunked uploads", discussion: """ By default monolithic method is used for uploading blobs to the registry but some registries support a more efficient chunked method. @@ -77,7 +80,8 @@ struct Push: AsyncParsableCommand { registry: registry, references: references, chunkSizeMb: chunkSize, - diskFormat: diskFormat + diskFormat: diskFormat, + concurrency: concurrency ) // Populate the local cache (if requested) if populateCache { diff --git a/Sources/tart/OCI/Layerizer/Disk.swift b/Sources/tart/OCI/Layerizer/Disk.swift index b420181f..839b596e 100644 --- a/Sources/tart/OCI/Layerizer/Disk.swift +++ b/Sources/tart/OCI/Layerizer/Disk.swift @@ -1,6 +1,6 @@ import Foundation protocol Disk { - static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] + static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer] static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache?) async throws } diff --git a/Sources/tart/OCI/Layerizer/DiskV1.swift b/Sources/tart/OCI/Layerizer/DiskV1.swift index edfa7613..e52419dd 100644 --- a/Sources/tart/OCI/Layerizer/DiskV1.swift +++ b/Sources/tart/OCI/Layerizer/DiskV1.swift @@ -5,7 +5,7 @@ class DiskV1: Disk { private static let bufferSizeBytes = 4 * 1024 * 1024 private static let layerLimitBytes = 500 * 1000 * 1000 - static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] { + static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer] { var pushedLayers: [OCIManifestLayer] = [] // Open the disk file diff --git a/Sources/tart/OCI/Layerizer/DiskV2.swift b/Sources/tart/OCI/Layerizer/DiskV2.swift index dcdce7eb..8a74be05 100644 --- a/Sources/tart/OCI/Layerizer/DiskV2.swift +++ b/Sources/tart/OCI/Layerizer/DiskV2.swift @@ -5,32 +5,52 @@ class DiskV2: Disk { private static let bufferSizeBytes = 4 * 1024 * 1024 private static let layerLimitBytes = 512 * 1024 * 1024 - static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, progress: Progress) async throws -> [OCIManifestLayer] { - var pushedLayers: [OCIManifestLayer] = [] + static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer] { + var pushedLayers: [(index: Int, pushedLayer: OCIManifestLayer)] = [] // Open the disk file let mappedDisk = try Data(contentsOf: diskURL, options: [.alwaysMapped]) // Compress the disk file as multiple individually decompressible streams, // each equal ``Self.layerLimitBytes`` bytes or less due to LZ4 compression - for data in mappedDisk.chunks(ofCount: layerLimitBytes) { - let compressedData = try (data as NSData).compressed(using: .lz4) as Data + try await withThrowingTaskGroup(of: (Int, OCIManifestLayer).self) { group in + for (index, data) in mappedDisk.chunks(ofCount: layerLimitBytes).enumerated() { + // Respect the concurrency limit + if index >= concurrency { + if let (index, pushedLayer) = try await group.next() { + pushedLayers.append((index, pushedLayer)) + } + } + + // Launch a disk layer pushing task + group.addTask { + let compressedData = try (data as NSData).compressed(using: .lz4) as Data - let layerDigest = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb) + let layerDigest = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb) - pushedLayers.append(OCIManifestLayer( - mediaType: diskV2MediaType, - size: compressedData.count, - digest: layerDigest, - uncompressedSize: UInt64(data.count), - uncompressedContentDigest: Digest.hash(data) - )) + // Update progress using a relative value + progress.completedUnitCount += Int64(data.count) - // Update progress using a relative value - progress.completedUnitCount += Int64(data.count) + return (index, OCIManifestLayer( + mediaType: diskV2MediaType, + size: compressedData.count, + digest: layerDigest, + uncompressedSize: UInt64(data.count), + uncompressedContentDigest: Digest.hash(data) + )) + } + } + + for try await pushedLayer in group { + pushedLayers.append(pushedLayer) + } } - return pushedLayers + return pushedLayers.sorted { + $0.index < $1.index + }.map { + $0.pushedLayer + } } static func pull(registry: Registry, diskLayers: [OCIManifestLayer], diskURL: URL, concurrency: UInt, progress: Progress, localLayerCache: LocalLayerCache? = nil) async throws { diff --git a/Sources/tart/VMDirectory+OCI.swift b/Sources/tart/VMDirectory+OCI.swift index 07c59225..59317b06 100644 --- a/Sources/tart/VMDirectory+OCI.swift +++ b/Sources/tart/VMDirectory+OCI.swift @@ -84,7 +84,7 @@ extension VMDirectory { try manifest.toJSON().write(to: manifestURL) } - func pushToRegistry(registry: Registry, references: [String], chunkSizeMb: Int, diskFormat: String) async throws -> RemoteName { + func pushToRegistry(registry: Registry, references: [String], chunkSizeMb: Int, diskFormat: String, concurrency: UInt) async throws -> RemoteName { var layers = Array() // Read VM's config and push it as blob @@ -103,9 +103,9 @@ extension VMDirectory { switch diskFormat { case "v1": - layers.append(contentsOf: try await DiskV1.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, progress: progress)) + layers.append(contentsOf: try await DiskV1.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, concurrency: concurrency, progress: progress)) case "v2": - layers.append(contentsOf: try await DiskV2.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, progress: progress)) + layers.append(contentsOf: try await DiskV2.push(diskURL: diskURL, registry: registry, chunkSizeMb: chunkSizeMb, concurrency: concurrency, progress: progress)) default: throw RuntimeError.OCIUnsupportedDiskFormat(diskFormat) } diff --git a/Tests/TartTests/LayerizerTests.swift b/Tests/TartTests/LayerizerTests.swift index d0f8dc7c..e170f3cc 100644 --- a/Tests/TartTests/LayerizerTests.swift +++ b/Tests/TartTests/LayerizerTests.swift @@ -36,7 +36,7 @@ final class LayerizerTests: XCTestCase { let pulledDiskFileURL = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString) print("pushing disk...") - let diskLayers = try await DiskV1.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, progress: Progress()) + let diskLayers = try await DiskV1.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, concurrency: 4, progress: Progress()) print("pulling disk...") try await DiskV1.pull(registry: registry, diskLayers: diskLayers, diskURL: pulledDiskFileURL, concurrency: 16, progress: Progress()) @@ -57,7 +57,7 @@ final class LayerizerTests: XCTestCase { let pulledDiskFileURL = FileManager.default.temporaryDirectory.appendingPathComponent(UUID().uuidString) print("pushing disk...") - let diskLayers = try await DiskV2.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, progress: Progress()) + let diskLayers = try await DiskV2.push(diskURL: originalDiskFileURL, registry: registry, chunkSizeMb: 0, concurrency: 4, progress: Progress()) print("pulling disk...") try await DiskV2.pull(registry: registry, diskLayers: diskLayers, diskURL: pulledDiskFileURL, concurrency: 16, progress: Progress())