这是indexloc提供的服务,不要输入任何密码
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 1 addition & 19 deletions Package.resolved

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ let package = Package(
.package(url: "https://github.com/apple/swift-argument-parser", from: "1.3.1"),
.package(url: "https://github.com/mhdhejazi/Dynamic", branch: "master"),
.package(url: "https://github.com/apple/swift-algorithms", from: "1.2.0"),
.package(url: "https://github.com/apple/swift-async-algorithms", branch: "main"),
.package(url: "https://github.com/malcommac/SwiftDate", from: "7.0.0"),
.package(url: "https://github.com/antlr/antlr4", exact: "4.13.2"),
.package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.2.0")),
Expand All @@ -29,7 +28,6 @@ let package = Package(
targets: [
.executableTarget(name: "tart", dependencies: [
.product(name: "Algorithms", package: "swift-algorithms"),
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms"),
.product(name: "ArgumentParser", package: "swift-argument-parser"),
.product(name: "Dynamic", package: "Dynamic"),
.product(name: "SwiftDate", package: "SwiftDate"),
Expand Down
122 changes: 66 additions & 56 deletions Sources/tart/Fetcher.swift
Original file line number Diff line number Diff line change
@@ -1,80 +1,90 @@
import Foundation
import AsyncAlgorithms

fileprivate let urlSession = createURLSession()
fileprivate var urlSessionConfiguration: URLSessionConfiguration {
let config = URLSessionConfiguration.default

class DownloadDelegate: NSObject, URLSessionTaskDelegate {
let progress: Progress
init(_ progress: Progress) throws {
self.progress = progress
}
// Harbor expects a CSRF token to be present if the HTTP client
// carries a session cookie between its requests[1] and fails if
// it was not present[2].
//
// To fix that, we disable the automatic cookies carry in URLSession.
//
// [1]: https://github.com/goharbor/harbor/blob/a4c577f9ec4f18396207a5e686433a6ba203d4ef/src/server/middleware/csrf/csrf.go#L78
// [2]: https://github.com/cirruslabs/tart/issues/295
config.httpShouldSetCookies = false

func urlSession(_ session: URLSession, didCreateTask task: URLSessionTask) {
self.progress.addChild(task.progress, withPendingUnitCount: self.progress.totalUnitCount)
}
return config
}

class Fetcher {
static func fetch(_ request: URLRequest, viaFile: Bool = false, progress: Progress? = nil) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
let delegate = progress != nil ? try DownloadDelegate(progress!) : nil
static func fetch(_ request: URLRequest, viaFile: Bool = false, progress: Progress? = nil) async throws -> (AsyncThrowingStream<Data, Error>, HTTPURLResponse) {
let delegate = Delegate()
let session = URLSession(configuration: urlSessionConfiguration, delegate: delegate, delegateQueue: nil)
let task = session.dataTask(with: request)

let stream = AsyncThrowingStream<Data, Error> { continuation in
delegate.streamContinuation = continuation
}

if viaFile {
return try await fetchViaFile(request, delegate: delegate)
let response = try await withCheckedThrowingContinuation { continuation in
delegate.responseContinuation = continuation
task.resume()
}

return try await fetchViaMemory(request, delegate: delegate)
return (stream, response as! HTTPURLResponse)
}
}

private static func fetchViaMemory(_ request: URLRequest, delegate: URLSessionTaskDelegate? = nil) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
let dataCh = AsyncThrowingChannel<Data, Error>()
fileprivate class Delegate: NSObject, URLSessionDataDelegate {
var responseContinuation: CheckedContinuation<URLResponse, Error>?
var streamContinuation: AsyncThrowingStream<Data, Error>.Continuation?

let (data, response) = try await urlSession.data(for: request, delegate: delegate)
private var buffer: Data = Data()
private let bufferFlushSize = 16 * 1024 * 1024

Task {
await dataCh.send(data)
func urlSession(
_ session: URLSession,
dataTask: URLSessionDataTask,
didReceive response: URLResponse,
completionHandler: @escaping (URLSession.ResponseDisposition) -> Void
) {
// Soft-limit for the maximum buffer capacity
let capacity = min(response.expectedContentLength, Int64(bufferFlushSize))

dataCh.finish()
}
// Pre-initialize buffer as we now know the capacity
buffer = Data(capacity: Int(capacity))

return (dataCh, response as! HTTPURLResponse)
responseContinuation?.resume(returning: response)
completionHandler(.allow)
}

private static func fetchViaFile(_ request: URLRequest, delegate: URLSessionTaskDelegate? = nil) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
let dataCh = AsyncThrowingChannel<Data, Error>()

let (fileURL, response) = try await urlSession.download(for: request, delegate: delegate)

// Acquire a handle to the downloaded file and then remove it.
//
// This keeps a working reference to that file, yet we don't
// have to deal with the cleanup any more.
let mappedFile = try Data(contentsOf: fileURL, options: [.alwaysMapped])
try FileManager.default.removeItem(at: fileURL)
func urlSession(
_ session: URLSession,
dataTask: URLSessionDataTask,
didReceive data: Data
) {
buffer.append(data)

Task {
for chunk in (0 ..< mappedFile.count).chunks(ofCount: 64 * 1024 * 1024) {
await dataCh.send(mappedFile.subdata(in: chunk))
}

dataCh.finish()
if buffer.count >= bufferFlushSize {
streamContinuation?.yield(buffer)
buffer.removeAll(keepingCapacity: true)
}

return (dataCh, response as! HTTPURLResponse)
}
}

fileprivate func createURLSession() -> URLSession {
let config = URLSessionConfiguration.default

// Harbor expects a CSRF token to be present if the HTTP client
// carries a session cookie between its requests[1] and fails if
// it was not present[2].
//
// To fix that, we disable the automatic cookies carry in URLSession.
//
// [1]: https://github.com/goharbor/harbor/blob/a4c577f9ec4f18396207a5e686433a6ba203d4ef/src/server/middleware/csrf/csrf.go#L78
// [2]: https://github.com/cirruslabs/tart/issues/295
config.httpShouldSetCookies = false
func urlSession(
_ session: URLSession,
task: URLSessionTask,
didCompleteWithError error: Error?
) {
if !buffer.isEmpty {
streamContinuation?.yield(buffer)
buffer.removeAll(keepingCapacity: true)
}

return URLSession(configuration: config)
if let error = error {
streamContinuation?.finish(throwing: error)
} else {
streamContinuation?.finish()
}
}
}
31 changes: 15 additions & 16 deletions Sources/tart/OCI/Layerizer/DiskV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@ class DiskV2: Disk {
private static let bufferSizeBytes = 4 * 1024 * 1024
private static let layerLimitBytes = 512 * 1024 * 1024

// A zero chunk for faster than byte-by-byte comparisons
//
// Assumes that the other Data(...) is equal in size, but it's fine to get a false-negative
// on the last block since it costs only 4 MiB of excess data per 512 MiB layer.
//
// Some simple benchmarks ("sync && sudo purge" command was used to negate the disk caching effects):
// +--------------------------------------+---------------------------------------------------+
// | Operation | time(1) result |
// +--------------------------------------+---------------------------------------------------+
// | Data(...) == zeroChunk | 2.16s user 11.71s system 73% cpu 18.928 total |
// | Data(...).contains(where: {$0 != 0}) | 603.68s user 12.97s system 99% cpu 10:22.85 total |
// +--------------------------------------+---------------------------------------------------+
private static let holeGranularityBytes = 4 * 1024 * 1024
private static let zeroChunk = Data(count: holeGranularityBytes)

static func push(diskURL: URL, registry: Registry, chunkSizeMb: Int, concurrency: UInt, progress: Progress) async throws -> [OCIManifestLayer] {
var pushedLayers: [(index: Int, pushedLayer: OCIManifestLayer)] = []

Expand Down Expand Up @@ -215,22 +230,6 @@ class DiskV2: Disk {
}

private static func zeroSkippingWrite(_ disk: FileHandle, _ rdisk: FileHandle?, _ fsBlockSize: UInt64, _ offset: UInt64, _ data: Data) throws -> UInt64 {
let holeGranularityBytes = 64 * 1024

// A zero chunk for faster than byte-by-byte comparisons
//
// Assumes that the other Data(...) is equal in size, but it's fine to get a false-negative
// on the last block since it costs only 64 KiB of excess data per 500 MB layer.
//
// Some simple benchmarks ("sync && sudo purge" command was used to negate the disk caching effects):
// +--------------------------------------+---------------------------------------------------+
// | Operation | time(1) result |
// +--------------------------------------+---------------------------------------------------+
// | Data(...) == zeroChunk | 2.16s user 11.71s system 73% cpu 18.928 total |
// | Data(...).contains(where: {$0 != 0}) | 603.68s user 12.97s system 99% cpu 10:22.85 total |
// +--------------------------------------+---------------------------------------------------+
let zeroChunk = Data(count: holeGranularityBytes)

var offset = offset

for chunk in data.chunks(ofCount: holeGranularityBytes) {
Expand Down
7 changes: 3 additions & 4 deletions Sources/tart/OCI/Registry.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import Foundation
import Algorithms
import AsyncAlgorithms

enum RegistryError: Error {
case UnexpectedHTTPStatusCode(when: String, code: Int, details: String = "")
Expand Down Expand Up @@ -31,7 +30,7 @@ extension Data {
}
}

extension AsyncThrowingChannel<Data, Error> {
extension AsyncThrowingStream<Data, Error> {
func asData() async throws -> Data {
var result = Data()

Expand Down Expand Up @@ -307,7 +306,7 @@ class Registry {
body: Data? = nil,
doAuth: Bool = true,
viaFile: Bool = false
) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
) async throws -> (AsyncThrowingStream<Data, Error>, HTTPURLResponse) {
var urlComponents = urlComponents

if urlComponents.queryItems == nil && !parameters.isEmpty {
Expand Down Expand Up @@ -413,7 +412,7 @@ class Registry {
return nil
}

private func authAwareRequest(request: URLRequest, viaFile: Bool = false, doAuth: Bool) async throws -> (AsyncThrowingChannel<Data, Error>, HTTPURLResponse) {
private func authAwareRequest(request: URLRequest, viaFile: Bool = false, doAuth: Bool) async throws -> (AsyncThrowingStream<Data, Error>, HTTPURLResponse) {
var request = request

if doAuth {
Expand Down
1 change: 0 additions & 1 deletion Sources/tart/VM.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import Foundation
import Virtualization
import AsyncAlgorithms
import Semaphore

struct UnsupportedRestoreImageError: Error {
Expand Down
Loading