Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion internal/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,8 @@ func (fs *fileSystem) mintInode(ic inode.Core, parInodeCtx context.Context) (in
fs.newConfig,
fs.globalMaxWriteBlocksSem,
fs.mrdCache,
fs.traceHandle)
fs.traceHandle,
fs.metricHandle)
}

// Place it in our map of IDs to inodes.
Expand Down
1 change: 1 addition & 0 deletions internal/fs/handle/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func createFileInode(
semaphore.NewWeighted(100),
nil,
tracing.NewNoopTracer(),
metrics.NewNoopMetrics(),
)
}

Expand Down
4 changes: 3 additions & 1 deletion internal/fs/inode/dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
storagemock "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/mock"
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/storageutil"
"github.com/googlecloudplatform/gcsfuse/v3/internal/util"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -268,7 +269,8 @@ func (t *DirTest) createLocalFileInode(parent Name, name string, id fuseops.Inod
&cfg.Config{},
semaphore.NewWeighted(math.MaxInt64),
nil,
tracing.NewNoopTracer()) // mrdCache
tracing.NewNoopTracer(),
metrics.NewNoopMetrics()) // mrdCache
return
}

Expand Down
47 changes: 42 additions & 5 deletions internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/storageutil"
"github.com/googlecloudplatform/gcsfuse/v3/internal/util"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/syncutil"
Expand Down Expand Up @@ -127,8 +128,9 @@ type FileInode struct {
globalMaxWriteBlocksSem *semaphore.Weighted

// mrdInstance manages the MultiRangeDownloader instances for this inode.
mrdInstance *gcsx.MrdInstance
traceHandle tracing.TraceHandle
mrdInstance *gcsx.MrdInstance
metricHandle metrics.MetricHandle
traceHandle tracing.TraceHandle
}

var _ Inode = &FileInode{}
Expand All @@ -154,7 +156,8 @@ func NewFileInode(
cfg *cfg.Config,
globalMaxBlocksSem *semaphore.Weighted,
mrdCache *lru.Cache,
traceHandle tracing.TraceHandle) (f *FileInode) {
traceHandle tracing.TraceHandle,
metricHandle metrics.MetricHandle) (f *FileInode) {
// Set up the basic struct.
var minObj gcs.MinObject
if m != nil {
Expand All @@ -174,6 +177,7 @@ func NewFileInode(
config: cfg,
globalMaxWriteBlocksSem: globalMaxBlocksSem,
traceHandle: traceHandle,
metricHandle: metricHandle,
}

if f.bucket.BucketType().Zonal {
Expand Down Expand Up @@ -669,7 +673,7 @@ func (f *FileInode) Write(
offset int64,
openMode util.OpenMode) (bool, error) {
if f.bwh != nil {
return f.writeUsingBufferedWrites(ctx, data, offset)
return f.writeUsingBufferedWrites(ctx, data, offset, openMode)
}

return false, f.writeUsingTempFile(ctx, data, offset)
Expand Down Expand Up @@ -699,7 +703,7 @@ func (f *FileInode) writeUsingTempFile(ctx context.Context, data []byte, offset
// Helper function to serve write for file using buffered writes handler.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) writeUsingBufferedWrites(ctx context.Context, data []byte, offset int64) (_ bool, err error) {
func (f *FileInode) writeUsingBufferedWrites(ctx context.Context, data []byte, offset int64, openMode util.OpenMode) (_ bool, err error) {
bytes := int64(len(data))
ctx, finishSpan := f.traceHandle.TraceUpload(ctx, tracing.WriteFileStreaming, f.src.Name, &bytes, &err)
defer finishSpan()
Expand All @@ -714,6 +718,7 @@ func (f *FileInode) writeUsingBufferedWrites(ctx context.Context, data []byte, o
// Fall back to temp file for Out-Of-Order Writes.
if errors.Is(err, bufferedwrites.ErrOutOfOrderWrite) {
logger.Infof("Out of order write detected. File %s will now use legacy staged writes. "+StreamingWritesSemantics, f.name.String())
f.recordFallback(openMode, metrics.WriteFallbackReasonOutOfOrderAttr)
// Finalize the object.
err = f.flushUsingBufferedWriteHandler(ctx)
if err != nil {
Expand Down Expand Up @@ -1058,6 +1063,7 @@ func (f *FileInode) truncateUsingBufferedWriteHandler(ctx context.Context, size
// If truncate size is less than the total file size resulting in OutOfOrder write, finalize and fall back to temp file.
if errors.Is(err, bufferedwrites.ErrOutOfOrderWrite) {
logger.Infof("Out of order write detected. File %s will now use legacy staged writes. "+StreamingWritesSemantics, f.name.String())
f.recordFallback(util.NewOpenMode(util.WriteOnly, 0), metrics.WriteFallbackReasonOutOfOrderAttr)
// Finalize the object.
err = f.flushUsingBufferedWriteHandler(ctx)
if err != nil {
Expand Down Expand Up @@ -1098,6 +1104,31 @@ func (f *FileInode) Truncate(
return false, f.truncateUsingTempFile(ctx, size)
}

// recordFallback maps util.OpenMode to metrics.OpenMode and records the fallback metric.
func (f *FileInode) recordFallback(openMode util.OpenMode, reason metrics.WriteFallbackReason) {
if f.metricHandle == nil {
return
}
var metricOpenMode metrics.OpenMode
switch openMode.AccessMode() {
case util.ReadWrite:
if openMode.IsAppend() {
metricOpenMode = metrics.OpenModeReadWriteAppendAttr
} else {
metricOpenMode = metrics.OpenModeReadWriteAttr
}
case util.WriteOnly:
if openMode.IsAppend() {
metricOpenMode = metrics.OpenModeWriteOnlyAppendAttr
} else {
metricOpenMode = metrics.OpenModeWriteOnlyAttr
}
}
if metricOpenMode != "" {
f.metricHandle.FsStreamingWriteFallbackCount(1, metricOpenMode, reason)
Comment thread
thrivikram-karur-g marked this conversation as resolved.
}
}

// Ensures cache content on read if content cache enabled
func (f *FileInode) CacheEnsureContent(ctx context.Context) (err error) {
if f.localFileCache {
Expand Down Expand Up @@ -1136,6 +1167,9 @@ func (f *FileInode) InitBufferedWriteHandlerIfEligible(ctx context.Context, open

tempFileInUse := f.content != nil
if !f.config.Write.EnableStreamingWrites || tempFileInUse {
if !tempFileInUse {
f.recordFallback(openMode, metrics.WriteFallbackReasonNotEnabledAttr)
}
// bwh should not be initialized under these conditions.
return false, nil
}
Expand Down Expand Up @@ -1181,9 +1215,11 @@ func (f *FileInode) InitBufferedWriteHandlerIfEligible(ctx context.Context, open
"limit (set by --write-global-max-blocks) has been reached. To allow more concurrent files "+
"to use streaming writes, consider increasing this limit if sufficient memory is available. "+
"For more details on memory usage, see: https://github.com/GoogleCloudPlatform/gcsfuse/blob/master/docs/semantics.md#writes", f.name.String())
f.recordFallback(openMode, metrics.WriteFallbackReasonConcurrentLimitBreachedAttr)
return false, nil
}
if err != nil {
f.recordFallback(openMode, metrics.WriteFallbackReasonOtherAttr)
return false, fmt.Errorf("failed to create bufferedWriteHandler: %w", err)
}
f.bwh.SetMtime(f.mtimeClock.Now())
Expand All @@ -1201,5 +1237,6 @@ func (f *FileInode) areBufferedWritesSupported(openMode util.OpenMode, obj *gcs.
return true
}
logger.Infof("Existing file %s of size %d bytes (non-zero) will use legacy staged writes. "+StreamingWritesSemantics, f.name.String(), obj.Size)
f.recordFallback(openMode, metrics.WriteFallbackReasonExistingFileAttr)
return false
}
7 changes: 5 additions & 2 deletions internal/fs/inode/file_mock_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
storagemock "github.com/googlecloudplatform/gcsfuse/v3/internal/storage/mock"
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/storageutil"
"github.com/googlecloudplatform/gcsfuse/v3/internal/util"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/syncutil"
Expand Down Expand Up @@ -119,7 +120,8 @@ func (t *FileMockBucketTest) createLockedInode(fileName string, fileType string)
&cfg.Config{},
semaphore.NewWeighted(math.MaxInt64),
nil,
tracing.NewNoopTracer())
tracing.NewNoopTracer(),
metrics.NewNoopMetrics())

// Create empty file for local inode created above.
err := t.in.CreateEmptyTempFile(t.ctx)
Expand Down Expand Up @@ -156,7 +158,8 @@ func (t *FileMockBucketTest) createGCSBackedFileInode(backingObj *gcs.MinObject)
&cfg.Config{},
semaphore.NewWeighted(math.MaxInt64),
nil,
tracing.NewNoopTracer())
tracing.NewNoopTracer(),
metrics.NewNoopMetrics())
f.Lock()
return f
}
Expand Down
4 changes: 3 additions & 1 deletion internal/fs/inode/file_streaming_writes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/fake"
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v3/internal/storage/storageutil"
"github.com/googlecloudplatform/gcsfuse/v3/metrics"
"github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/operations"
"github.com/googlecloudplatform/gcsfuse/v3/tools/integration_tests/util/setup"
"github.com/googlecloudplatform/gcsfuse/v3/tracing"
Expand Down Expand Up @@ -154,7 +155,8 @@ func (t *FileStreamingWritesCommon) createInode(fileType string) {
&cfg.Config{},
semaphore.NewWeighted(math.MaxInt64),
nil,
tracing.NewNoopTracer())
tracing.NewNoopTracer(),
metrics.NewNoopMetrics())

// Set buffered write config for created inode.
t.in.config = &cfg.Config{Write: cfg.WriteConfig{
Expand Down
3 changes: 2 additions & 1 deletion internal/fs/inode/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ func (t *FileTest) createInodeWithLocalParam(fileName string, local bool) {
&cfg.Config{},
semaphore.NewWeighted(math.MaxInt64),
nil,
tracing.NewNoopTracer())
tracing.NewNoopTracer(),
metrics.NewNoopMetrics())

t.in.Lock()
}
Expand Down
24 changes: 24 additions & 0 deletions metrics/metric_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ const (
IoMethodOpenedAttr IoMethod = "opened"
)

// OpenMode is a custom type for the open_mode attribute.
type OpenMode string

const (
OpenModeReadWriteAttr OpenMode = "read_write"
OpenModeReadWriteAppendAttr OpenMode = "read_write_append"
OpenModeWriteOnlyAttr OpenMode = "write_only"
OpenModeWriteOnlyAppendAttr OpenMode = "write_only_append"
)

// ReadType is a custom type for the read_type attribute.
type ReadType string

Expand Down Expand Up @@ -141,6 +151,17 @@ const (
RetryErrorCategorySTALLEDREADREQUESTAttr RetryErrorCategory = "STALLED_READ_REQUEST"
)

// WriteFallbackReason is a custom type for the write_fallback_reason attribute.
type WriteFallbackReason string

const (
WriteFallbackReasonConcurrentLimitBreachedAttr WriteFallbackReason = "concurrent_limit_breached"
WriteFallbackReasonExistingFileAttr WriteFallbackReason = "existing_file"
WriteFallbackReasonNotEnabledAttr WriteFallbackReason = "not_enabled"
WriteFallbackReasonOtherAttr WriteFallbackReason = "other"
WriteFallbackReasonOutOfOrderAttr WriteFallbackReason = "out_of_order"
)

// MetricHandle provides an interface for recording metrics.
// The methods of this interface are auto-generated from metrics.yaml.
// Each method corresponds to a metric defined in metrics.yaml.
Expand Down Expand Up @@ -169,6 +190,9 @@ type MetricHandle interface {
// FsOpsLatency - The cumulative distribution of file system operation latencies
FsOpsLatency(ctx context.Context, latency time.Duration, fsOp FsOp)

// FsStreamingWriteFallbackCount - The cumulative number of streaming write fallbacks with reason attached
FsStreamingWriteFallbackCount(inc int64, openMode OpenMode, writeFallbackReason WriteFallbackReason)

// GcsDownloadBytesCount - The cumulative number of bytes downloaded from GCS along with type - Sequential/Random
GcsDownloadBytesCount(inc int64, readType ReadType)

Expand Down
20 changes: 20 additions & 0 deletions metrics/metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,26 @@
attribute-type: string
values: *fs_ops_list

- metric-name: "fs/streaming_write_fallback_count"
description: "The cumulative number of streaming write fallbacks with reason attached"
type: "int_counter"
attributes:
- attribute-name: open_mode # Note: considering only those modes valid for write
attribute-type: string
values:
- "write_only"
- "write_only_append"
- "read_write"
- "read_write_append"
- attribute-name: write_fallback_reason
attribute-type: string
values:
- "out_of_order"
- "existing_file"
- "concurrent_limit_breached"
- "not_enabled"
- "other" # tracks any other errors not from above

- metric-name: "gcs/download_bytes_count"
description: "The cumulative number of bytes downloaded from GCS along with type - Sequential/Random"
unit: "By"
Expand Down
3 changes: 3 additions & 0 deletions metrics/noop_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func (*noopMetrics) FsOpsErrorCount(inc int64, fsErrorCategory FsErrorCategory,

func (*noopMetrics) FsOpsLatency(ctx context.Context, latency time.Duration, fsOp FsOp) {}

func (*noopMetrics) FsStreamingWriteFallbackCount(inc int64, openMode OpenMode, writeFallbackReason WriteFallbackReason) {
}

func (*noopMetrics) GcsDownloadBytesCount(inc int64, readType ReadType) {}

func (*noopMetrics) GcsReadBytesCount(inc int64) {}
Expand Down
Loading
Loading