这是indexloc提供的服务,不要输入任何密码
Skip to content

feat(parallel random reads): Changes to enable parallel random read handling in random reader #3577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 123 additions & 65 deletions internal/gcsx/random_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
"math"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -177,6 +178,15 @@ type randomReader struct {
// Specifies the next expected offset for the reads. Used to distinguish between
// sequential and random reads.
expectedOffset atomic.Int64

// To synchronize reads served from range reader.
mu sync.Mutex
}

type readInfo struct {
readType int64
expectedOffset int64
seekRecorded bool
}

func (rr *randomReader) CheckInvariants() {
Expand Down Expand Up @@ -311,6 +321,12 @@ func (rr *randomReader) ReadAt(
if offset >= int64(rr.object.Size) {
err = io.EOF
return
} else if offset < 0 {
err = fmt.Errorf(
"illegal offset %d for %d-byte object",
offset,
rr.object.Size)
return
}

// Note: If we are reading the file for the first time and read type is sequential
Expand All @@ -329,60 +345,72 @@ func (rr *randomReader) ReadAt(
return
}

// Check first if we can read using existing reader. if not, determine which
// api to use and call gcs accordingly.

// When the offset is AFTER the reader position, try to seek forward, within reason.
// This happens when the kernel page cache serves some data. It's very common for
// concurrent reads, often by only a few 128kB fuse read requests. The aim is to
// re-use GCS connection and avoid throwing away already read data.
// For parallel sequential reads to a single file, not throwing away the connections
// is a 15-20x improvement in throughput: 150-200 MiB/s instead of 10 MiB/s.
if rr.reader != nil && rr.start < offset && offset-rr.start < maxReadSize {
bytesToSkip := offset - rr.start
discardedBytes, copyError := io.CopyN(io.Discard, rr.reader, bytesToSkip)
// io.EOF is expected if the reader is shorter than the requested offset to read.
if copyError != nil && !errors.Is(copyError, io.EOF) {
logger.Warnf("Error while skipping reader bytes: %v", copyError)
// Not taking any lock for getting reader type to ensure random read requests do not wait.
readInfo := rr.getReadInfo(offset, false)
reqReaderType := readerType(readInfo.readType, rr.bucket.BucketType())

if reqReaderType == RangeReader {
rr.mu.Lock()
expectedOffset := rr.expectedOffset.Load()

// Calculating reader type again for zonal buckets in case another read has been served
// since last computation. This is to ensure that we don't use range reader incorrectly
// when MRD should've been used.
if rr.bucket.BucketType().Zonal && readInfo.expectedOffset != expectedOffset {
readInfo = rr.getReadInfo(offset, readInfo.seekRecorded)
reqReaderType = readerType(readInfo.readType, rr.bucket.BucketType())
}
rr.start += discardedBytes
}

// If we have an existing reader, but it's positioned at the wrong place,
// clean it up and throw it away.
// We will also clean up the existing reader if it can't serve the entire request.
dataToRead := math.Min(float64(offset+int64(len(p))), float64(rr.object.Size))
if rr.reader != nil && (rr.start != offset || int64(dataToRead) > rr.limit) {
rr.closeReader()
rr.reader = nil
rr.cancel = nil
}
if reqReaderType == MultiRangeReader {
rr.mu.Unlock()
} else {
defer rr.mu.Unlock()

// Check first if we can read using existing reader. if not, determine which
// api to use and call gcs accordingly.

// When the offset is AFTER the reader position, try to seek forward, within reason.
// This happens when the kernel page cache serves some data. It's very common for
// concurrent reads, often by only a few 128kB fuse read requests. The aim is to
// re-use GCS connection and avoid throwing away already read data.
// For parallel sequential reads to a single file, not throwing away the connections
// is a 15-20x improvement in throughput: 150-200 MiB/s instead of 10 MiB/s.
if rr.reader != nil && rr.start < offset && offset-rr.start < maxReadSize {
bytesToSkip := offset - rr.start
discardedBytes, copyError := io.CopyN(io.Discard, rr.reader, bytesToSkip)
// io.EOF is expected if the reader is shorter than the requested offset to read.
if copyError != nil && !errors.Is(copyError, io.EOF) {
logger.Warnf("Error while skipping reader bytes: %v", copyError)
}
rr.start += discardedBytes
}

if rr.reader != nil {
objectData.Size, err = rr.readFromRangeReader(ctx, p, offset, -1, rr.readType.Load())
return
}
// If we have an existing reader, but it's positioned at the wrong place,
// clean it up and throw it away.
// We will also clean up the existing reader if it can't serve the entire request.
dataToRead := math.Min(float64(offset+int64(len(p))), float64(rr.object.Size))
if rr.reader != nil && (rr.start != offset || int64(dataToRead) > rr.limit) {
rr.closeReader()
rr.reader = nil
rr.cancel = nil
}

// If the data can't be served from the existing reader, then we need to update the seeks.
// If current offset is not same as expected offset, its a random read.
if expectedOffset := rr.expectedOffset.Load(); expectedOffset != 0 && expectedOffset != offset {
rr.seeks.Add(1)
}
if rr.reader != nil {
objectData.Size, err = rr.readFromRangeReader(ctx, p, offset, -1, rr.readType.Load())
return
}

// If we don't have a reader, determine whether to read from NewReader or MRR.
end, err := rr.getReadInfo(offset, int64(len(p)))
if err != nil {
err = fmt.Errorf("ReadAt: getReaderInfo: %w", err)
return
// reader does not exist and need to be created, get the end offset.
end := rr.getEndOffset(offset)
objectData.Size, err = rr.readFromRangeReader(ctx, p, offset, end, readInfo.readType)
return
}
}

readerType := readerType(rr.readType.Load(), offset, end, rr.bucket.BucketType())
if readerType == RangeReader {
objectData.Size, err = rr.readFromRangeReader(ctx, p, offset, end, rr.readType.Load())
return
if reqReaderType == MultiRangeReader {
objectData.Size, err = rr.readFromMultiRangeReader(ctx, p, offset, offset+int64(len(p)), TimeoutForMultiRangeRead)
}

objectData.Size, err = rr.readFromMultiRangeReader(ctx, p, offset, end, TimeoutForMultiRangeRead)
return
}

Expand Down Expand Up @@ -516,26 +544,58 @@ func (rr *randomReader) startRead(start int64, end int64) (err error) {
return
}

// getReaderInfo determines the readType and provides the range to query GCS.
// Range here is [start, end]. End is computed using the readType, start offset
// and size of the data the callers needs.
func (rr *randomReader) getReadInfo(
start int64,
size int64) (end int64, err error) {
// Make sure start and size are legal.
if start < 0 || uint64(start) > rr.object.Size || size < 0 {
err = fmt.Errorf(
"range [%d, %d) is illegal for %d-byte object",
start,
start+size,
rr.object.Size)
return
// isSeekNeeded determines if the current read at `offset` should be considered a
// seek, given the previous read pattern & the expected offset.
func isSeekNeeded(readType, offset, expectedOffset int64) bool {
if expectedOffset == 0 {
return false
} else if readType == metrics.ReadTypeRandom {
return offset != expectedOffset
} else if readType == metrics.ReadTypeSequential {
return offset < expectedOffset || offset > expectedOffset+maxReadSize
}
return false
}

// getReadInfo determines the read strategy (sequential or random) for a read
// request at a given offset and returns read metadata. It also updates the
// reader's internal state based on the read pattern.
func (rr *randomReader) getReadInfo(offset int64, seekRecorded bool) readInfo {
readType := rr.readType.Load()
expOffset := rr.expectedOffset.Load()
numSeeks := rr.seeks.Load()

if !seekRecorded && isSeekNeeded(readType, offset, expOffset) {
numSeeks = rr.seeks.Add(1)
seekRecorded = true
}

if err != nil {
return
if numSeeks >= minSeeksForRandom {
readType = metrics.ReadTypeRandom
}

averageReadBytes := rr.totalReadBytes.Load()
if numSeeks > 0 {
averageReadBytes /= numSeeks
}

if averageReadBytes >= maxReadSize {
readType = metrics.ReadTypeSequential
}

rr.readType.Store(readType)
return readInfo{
readType: readType,
expectedOffset: expOffset,
seekRecorded: seekRecorded,
}
}

// getEndOffset returns the end offset for the range to query GCS.
// Range here is [start, end]. End is computed for sequential reads using
// start offset and size of the data the callers needs.
func (rr *randomReader) getEndOffset(
start int64) (end int64) {
// GCS requests are expensive. Prefer to issue read requests defined by
// sequentialReadSizeMb flag. Sequential reads will simply sip from the fire house
// with each call to ReadAt. In practice, GCS will fill the TCP buffers
Expand All @@ -549,7 +609,6 @@ func (rr *randomReader) getReadInfo(
// (average read size in bytes rounded up to the next MiB).
end = int64(rr.object.Size)
if seeks := rr.seeks.Load(); seeks >= minSeeksForRandom {
rr.readType.Store(metrics.ReadTypeRandom)
averageReadBytes := rr.totalReadBytes.Load() / seeks
if averageReadBytes < maxReadSize {
randomReadSize := int64(((averageReadBytes / MiB) + 1) * MiB)
Expand Down Expand Up @@ -577,9 +636,8 @@ func (rr *randomReader) getReadInfo(
}

// readerType specifies the go-sdk interface to use for reads.
func readerType(readType int64, start int64, end int64, bucketType gcs.BucketType) ReaderType {
bytesToBeRead := end - start
if readType == metrics.ReadTypeRandom && bytesToBeRead < maxReadSize && bucketType.Zonal {
func readerType(readType int64, bucketType gcs.BucketType) ReaderType {
if readType == metrics.ReadTypeRandom && bucketType.Zonal {
return MultiRangeReader
}
return RangeReader
Expand Down
Loading
Loading