fix: add VolumeStats cache to avoid massive statfs calls
This commit is contained in:
parent
cab0f64a70
commit
432a3b9cbc
@ -26,12 +26,13 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
|
||||
nodeID = flag.String("nodeid", "", "node id")
|
||||
mountPermissions = flag.Uint64("mount-permissions", 0, "mounted folder permissions")
|
||||
driverName = flag.String("drivername", nfs.DefaultDriverName, "name of the driver")
|
||||
workingMountDir = flag.String("working-mount-dir", "/tmp", "working directory for provisioner to mount nfs shares temporarily")
|
||||
defaultOnDeletePolicy = flag.String("default-ondelete-policy", "", "default policy for deleting subdirectory when deleting a volume")
|
||||
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
|
||||
nodeID = flag.String("nodeid", "", "node id")
|
||||
mountPermissions = flag.Uint64("mount-permissions", 0, "mounted folder permissions")
|
||||
driverName = flag.String("drivername", nfs.DefaultDriverName, "name of the driver")
|
||||
workingMountDir = flag.String("working-mount-dir", "/tmp", "working directory for provisioner to mount nfs shares temporarily")
|
||||
defaultOnDeletePolicy = flag.String("default-ondelete-policy", "", "default policy for deleting subdirectory when deleting a volume")
|
||||
volStatsCacheExpireInMinutes = flag.Int("vol-stats-cache-expire-in-minutes", 10, "The cache expire time in minutes for volume stats cache")
|
||||
)
|
||||
|
||||
func main() {
|
||||
@ -48,12 +49,13 @@ func main() {
|
||||
|
||||
func handle() {
|
||||
driverOptions := nfs.DriverOptions{
|
||||
NodeID: *nodeID,
|
||||
DriverName: *driverName,
|
||||
Endpoint: *endpoint,
|
||||
MountPermissions: *mountPermissions,
|
||||
WorkingMountDir: *workingMountDir,
|
||||
DefaultOnDeletePolicy: *defaultOnDeletePolicy,
|
||||
NodeID: *nodeID,
|
||||
DriverName: *driverName,
|
||||
Endpoint: *endpoint,
|
||||
MountPermissions: *mountPermissions,
|
||||
WorkingMountDir: *workingMountDir,
|
||||
DefaultOnDeletePolicy: *defaultOnDeletePolicy,
|
||||
VolStatsCacheExpireInMinutes: *volStatsCacheExpireInMinutes,
|
||||
}
|
||||
d := nfs.NewDriver(&driverOptions)
|
||||
d.Run(false)
|
||||
|
||||
@ -19,6 +19,7 @@ package nfs
|
||||
import (
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"k8s.io/klog/v2"
|
||||
@ -29,12 +30,13 @@ import (
|
||||
|
||||
// DriverOptions defines driver parameters specified in driver deployment
|
||||
type DriverOptions struct {
|
||||
NodeID string
|
||||
DriverName string
|
||||
Endpoint string
|
||||
MountPermissions uint64
|
||||
WorkingMountDir string
|
||||
DefaultOnDeletePolicy string
|
||||
NodeID string
|
||||
DriverName string
|
||||
Endpoint string
|
||||
MountPermissions uint64
|
||||
WorkingMountDir string
|
||||
DefaultOnDeletePolicy string
|
||||
VolStatsCacheExpireInMinutes int
|
||||
}
|
||||
|
||||
type Driver struct {
|
||||
@ -53,7 +55,8 @@ type Driver struct {
|
||||
volumeLocks *VolumeLocks
|
||||
|
||||
// a timed cache storing volume stats <volumeID, volumeStats>
|
||||
volStatsCache azcache.Resource
|
||||
volStatsCache azcache.Resource
|
||||
volStatsCacheExpireInMinutes int
|
||||
}
|
||||
|
||||
const (
|
||||
@ -81,12 +84,13 @@ func NewDriver(options *DriverOptions) *Driver {
|
||||
klog.V(2).Infof("Driver: %v version: %v", options.DriverName, driverVersion)
|
||||
|
||||
n := &Driver{
|
||||
name: options.DriverName,
|
||||
version: driverVersion,
|
||||
nodeID: options.NodeID,
|
||||
endpoint: options.Endpoint,
|
||||
mountPermissions: options.MountPermissions,
|
||||
workingMountDir: options.WorkingMountDir,
|
||||
name: options.DriverName,
|
||||
version: driverVersion,
|
||||
nodeID: options.NodeID,
|
||||
endpoint: options.Endpoint,
|
||||
mountPermissions: options.MountPermissions,
|
||||
workingMountDir: options.WorkingMountDir,
|
||||
volStatsCacheExpireInMinutes: options.VolStatsCacheExpireInMinutes,
|
||||
}
|
||||
|
||||
n.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
|
||||
@ -102,6 +106,16 @@ func NewDriver(options *DriverOptions) *Driver {
|
||||
csi.NodeServiceCapability_RPC_UNKNOWN,
|
||||
})
|
||||
n.volumeLocks = NewVolumeLocks()
|
||||
|
||||
if options.VolStatsCacheExpireInMinutes <= 0 {
|
||||
options.VolStatsCacheExpireInMinutes = 10 // default expire in 10 minutes
|
||||
}
|
||||
|
||||
var err error
|
||||
getter := func(key string) (interface{}, error) { return nil, nil }
|
||||
if n.volStatsCache, err = azcache.NewTimedCache(time.Duration(options.VolStatsCacheExpireInMinutes)*time.Minute, getter, false); err != nil {
|
||||
klog.Fatalf("%v", err)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
|
||||
@ -20,9 +20,11 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"github.com/stretchr/testify/assert"
|
||||
azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -52,6 +54,8 @@ func NewEmptyDriver(emptyField string) *Driver {
|
||||
}
|
||||
}
|
||||
d.volumeLocks = NewVolumeLocks()
|
||||
getter := func(key string) (interface{}, error) { return nil, nil }
|
||||
d.volStatsCache, _ = azcache.NewTimedCache(time.Minute, getter, false)
|
||||
return d
|
||||
}
|
||||
|
||||
|
||||
@ -30,6 +30,8 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
mount "k8s.io/mount-utils"
|
||||
|
||||
azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache"
|
||||
)
|
||||
|
||||
// NodeServer driver
|
||||
@ -195,6 +197,17 @@ func (ns *NodeServer) NodeGetVolumeStats(_ context.Context, req *csi.NodeGetVolu
|
||||
return nil, status.Error(codes.InvalidArgument, "NodeGetVolumeStats volume path was empty")
|
||||
}
|
||||
|
||||
// check if the volume stats is cached
|
||||
cache, err := ns.Driver.volStatsCache.Get(req.VolumeId, azcache.CacheReadTypeDefault)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
if cache != nil {
|
||||
resp := cache.(csi.NodeGetVolumeStatsResponse)
|
||||
klog.V(6).Infof("NodeGetVolumeStats: volume stats for volume %s path %s is cached", req.VolumeId, req.VolumePath)
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
if _, err := os.Lstat(req.VolumePath); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil, status.Errorf(codes.NotFound, "path %s does not exist", req.VolumePath)
|
||||
@ -233,7 +246,7 @@ func (ns *NodeServer) NodeGetVolumeStats(_ context.Context, req *csi.NodeGetVolu
|
||||
return nil, status.Errorf(codes.Internal, "failed to transform disk inodes used(%v)", volumeMetrics.InodesUsed)
|
||||
}
|
||||
|
||||
return &csi.NodeGetVolumeStatsResponse{
|
||||
resp := csi.NodeGetVolumeStatsResponse{
|
||||
Usage: []*csi.VolumeUsage{
|
||||
{
|
||||
Unit: csi.VolumeUsage_BYTES,
|
||||
@ -248,7 +261,11 @@ func (ns *NodeServer) NodeGetVolumeStats(_ context.Context, req *csi.NodeGetVolu
|
||||
Used: inodesUsed,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// cache the volume stats per volume
|
||||
ns.Driver.volStatsCache.Set(req.VolumeId, &resp)
|
||||
return &resp, err
|
||||
}
|
||||
|
||||
// NodeUnstageVolume unstage volume
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user