From ce66bf8a858ee29b2466c8200157f8d7a1acb730 Mon Sep 17 00:00:00 2001 From: Jan Wozniak Date: Fri, 17 Mar 2023 17:11:06 +0100 Subject: [PATCH] feat: implement volume snapshots --- pkg/nfs/controllerserver.go | 298 +++++++++++++++++++++++++++++++++++- pkg/nfs/nfs.go | 1 + 2 files changed, 291 insertions(+), 8 deletions(-) diff --git a/pkg/nfs/controllerserver.go b/pkg/nfs/controllerserver.go index eb8c52b0..a1911ac0 100644 --- a/pkg/nfs/controllerserver.go +++ b/pkg/nfs/controllerserver.go @@ -18,6 +18,7 @@ package nfs import ( "fmt" + "io/fs" "os" "os/exec" "path/filepath" @@ -29,6 +30,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" "k8s.io/klog/v2" ) @@ -59,6 +61,35 @@ type nfsVolume struct { onDelete string } +// nfsSnapshot is an internal representation of a volume snapshot +// created by the provisioner. +type nfsSnapshot struct { + // Snapshot id. + id string + // Address of the NFS server. + // Matches paramServer. + server string + // Base directory of the NFS server to create snapshots under + // Matches paramShare. + baseDir string + // Snapshot name. + uuid string + // Source volume. + src string +} + +func (snap nfsSnapshot) archiveSubPath() string { + return snap.uuid +} + +func (snap nfsSnapshot) archiveName() string { + return fmt.Sprintf("%v.tar.gz", snap.src) +} + +func (snap nfsSnapshot) archivePath() string { + return filepath.Join(snap.archiveSubPath(), snap.archiveName()) +} + // Ordering of elements in the CSI volume id. // ID is of the form {server}/{baseDir}/{subDir}. // TODO: This volume id format limits baseDir and @@ -74,6 +105,19 @@ const ( totalIDElements // Always last ) +// Ordering of elements in the CSI snapshot id. +// ID is of the form {server}/{baseDir}/{snapName}/{srcVolumeName}. +// Adding a new element should always go at the end +// before totalSnapIDElements +const ( + idSnapServer = iota + idSnapBaseDir + idSnapUUID + idSnapArchivePath + idSnapArchiveName + totalIDSnapElements // Always last +) + // CreateVolume create a volume func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { name := req.GetName() @@ -263,11 +307,115 @@ func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req * } func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { - return nil, status.Error(codes.Unimplemented, "") + if len(req.GetName()) == 0 { + return nil, status.Error(codes.InvalidArgument, "CreateSnapshot name must be provided") + } + if len(req.GetSourceVolumeId()) == 0 { + return nil, status.Error(codes.InvalidArgument, "CreateSnapshot source volume ID must be provided") + } + + srcVol, err := getNfsVolFromID(req.GetSourceVolumeId()) + if err != nil { + return nil, status.Errorf(codes.NotFound, "failed to create source volume: %v", err) + } + snapshot, err := newNFSSnapshot(req.GetName(), req.GetParameters(), srcVol) + if err != nil { + return nil, status.Errorf(codes.NotFound, "failed to create nfsSnapshot: %v", err) + } + snapVol := volumeFromSnapshot(snapshot) + if err = cs.internalMount(ctx, snapVol, nil, nil); err != nil { + return nil, status.Errorf(codes.Internal, "failed to mount snapshot nfs server: %v", err) + } + defer func() { + if err = cs.internalUnmount(ctx, snapVol); err != nil { + klog.Warningf("failed to unmount snapshot nfs server: %v", err) + } + }() + snapInternalVolPath := filepath.Join(getInternalVolumePath(cs.Driver.workingMountDir, snapVol), snapshot.archiveSubPath()) + if err = os.MkdirAll(snapInternalVolPath, 0777); err != nil { + return nil, status.Errorf(codes.Internal, "failed to make subdirectory: %v", err) + } + if err := validateSnapshot(snapInternalVolPath, snapshot); err != nil { + return nil, err + } + + if err = cs.internalMount(ctx, srcVol, nil, nil); err != nil { + return nil, status.Errorf(codes.Internal, "failed to mount src nfs server: %v", err) + } + defer func() { + if err = cs.internalUnmount(ctx, srcVol); err != nil { + klog.Warningf("failed to unmount src nfs server: %v", err) + } + }() + + srcPath := getInternalVolumePath(cs.Driver.workingMountDir, srcVol) + dstPath := filepath.Join(snapInternalVolPath, snapshot.archiveName()) + klog.V(2).Infof("archiving %v -> %v", srcPath, dstPath) + out, err := exec.Command("tar", "-C", srcPath, "-czvf", dstPath, ".").CombinedOutput() + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to create archive for snapshot: %v: %v", err, string(out)) + } + klog.V(2).Infof("archived %s -> %s", srcPath, dstPath) + + var snapshotSize int64 + fi, err := os.Stat(dstPath) + if err != nil { + klog.Warningf("failed to determine snapshot size: %v", err) + } else { + snapshotSize = fi.Size() + } + return &csi.CreateSnapshotResponse{ + Snapshot: &csi.Snapshot{ + SnapshotId: snapshot.id, + SourceVolumeId: srcVol.id, + SizeBytes: snapshotSize, + CreationTime: timestamppb.Now(), + ReadyToUse: true, + }, + }, nil } func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { - return nil, status.Error(codes.Unimplemented, "") + if len(req.GetSnapshotId()) == 0 { + return nil, status.Error(codes.InvalidArgument, "Snapshot ID is required for deletion") + } + snap, err := getNfsSnapFromID(req.GetSnapshotId()) + if err != nil { + // An invalid ID should be treated as doesn't exist + klog.Warningf("failed to get nfs snapshot for id %v deletion: %v", req.GetSnapshotId(), err) + return &csi.DeleteSnapshotResponse{}, nil + } + + var volCap *csi.VolumeCapability + mountOptions := getMountOptions(req.GetSecrets()) + if mountOptions != "" { + klog.V(2).Infof("DeleteSnapshot: found mountOptions(%s) for snapshot(%s)", mountOptions, req.GetSnapshotId()) + volCap = &csi.VolumeCapability{ + AccessType: &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{ + MountFlags: []string{mountOptions}, + }, + }, + } + } + vol := volumeFromSnapshot(snap) + if err = cs.internalMount(ctx, vol, nil, volCap); err != nil { + return nil, status.Errorf(codes.Internal, "failed to mount nfs server for snapshot deletion: %v", err) + } + defer func() { + if err = cs.internalUnmount(ctx, vol); err != nil { + klog.Warningf("failed to unmount nfs server after snapshot deletion: %v", err) + } + }() + + // delete snapshot archive + internalVolumePath := filepath.Join(getInternalVolumePath(cs.Driver.workingMountDir, vol), snap.archiveSubPath()) + klog.V(2).Infof("Removing snapshot archive at %v", internalVolumePath) + if err = os.RemoveAll(internalVolumePath); err != nil { + return nil, status.Errorf(codes.Internal, "failed to delete subdirectory: %v", err.Error()) + } + + return &csi.DeleteSnapshotResponse{}, nil } func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) { @@ -325,6 +473,47 @@ func (cs *ControllerServer) internalUnmount(ctx context.Context, vol *nfsVolume) return err } +func (cs *ControllerServer) copyFromSnapshot(ctx context.Context, req *csi.CreateVolumeRequest, dstVol *nfsVolume) error { + snap, err := getNfsSnapFromID(req.VolumeContentSource.GetSnapshot().GetSnapshotId()) + if err != nil { + return status.Error(codes.NotFound, err.Error()) + } + snapVol := volumeFromSnapshot(snap) + + var volCap *csi.VolumeCapability + if len(req.GetVolumeCapabilities()) > 0 { + volCap = req.GetVolumeCapabilities()[0] + } + + if err = cs.internalMount(ctx, snapVol, nil, volCap); err != nil { + return status.Errorf(codes.Internal, "failed to mount src nfs server for snapshot volume copy: %v", err) + } + defer func() { + if err = cs.internalUnmount(ctx, snapVol); err != nil { + klog.Warningf("failed to unmount src nfs server after snapshot volume copy: %v", err) + } + }() + if err = cs.internalMount(ctx, dstVol, nil, volCap); err != nil { + return status.Errorf(codes.Internal, "failed to mount dst nfs server for snapshot volume copy: %v", err) + } + defer func() { + if err = cs.internalUnmount(ctx, dstVol); err != nil { + klog.Warningf("failed to unmount dst nfs server after snapshot volume copy: %v", err) + } + }() + + // untar snapshot archive to dst path + snapPath := filepath.Join(getInternalVolumePath(cs.Driver.workingMountDir, snapVol), snap.archivePath()) + dstPath := getInternalVolumePath(cs.Driver.workingMountDir, dstVol) + klog.V(2).Infof("copy volume from snapshot %v -> %v", snapPath, dstPath) + out, err := exec.Command("tar", "-xzvf", snapPath, "-C", dstPath).CombinedOutput() + if err != nil { + return status.Errorf(codes.Internal, "failed to copy volume for snapshot: %v: %v", err, string(out)) + } + klog.V(2).Infof("volume copied from snapshot %v -> %v", snapPath, dstPath) + return nil +} + func (cs *ControllerServer) copyFromVolume(ctx context.Context, req *csi.CreateVolumeRequest, dstVol *nfsVolume) error { srcVol, err := getNfsVolFromID(req.GetVolumeContentSource().GetVolume().GetVolumeId()) if err != nil { @@ -340,26 +529,26 @@ func (cs *ControllerServer) copyFromVolume(ctx context.Context, req *csi.CreateV volCap = req.GetVolumeCapabilities()[0] } if err = cs.internalMount(ctx, srcVol, nil, volCap); err != nil { - return status.Errorf(codes.Internal, "failed to mount src nfs server: %v", err.Error()) + return status.Errorf(codes.Internal, "failed to mount src nfs server: %v", err) } defer func() { if err = cs.internalUnmount(ctx, srcVol); err != nil { - klog.Warningf("failed to unmount nfs server: %v", err.Error()) + klog.Warningf("failed to unmount nfs server: %v", err) } }() if err = cs.internalMount(ctx, dstVol, nil, volCap); err != nil { - return status.Errorf(codes.Internal, "failed to mount dst nfs server: %v", err.Error()) + return status.Errorf(codes.Internal, "failed to mount dst nfs server: %v", err) } defer func() { if err = cs.internalUnmount(ctx, dstVol); err != nil { - klog.Warningf("failed to unmount dst nfs server: %v", err.Error()) + klog.Warningf("failed to unmount dst nfs server: %v", err) } }() // recursive 'cp' with '-a' to handle symlinks out, err := exec.Command("cp", "-a", srcPath, dstPath).CombinedOutput() if err != nil { - return status.Error(codes.Internal, fmt.Sprintf("%v: %v", err, string(out))) + return status.Errorf(codes.Internal, "failed to copy volume %v: %v", err, string(out)) } klog.V(2).Infof("copied %s -> %s", srcPath, dstPath) return nil @@ -369,7 +558,7 @@ func (cs *ControllerServer) copyVolume(ctx context.Context, req *csi.CreateVolum vs := req.VolumeContentSource switch vs.Type.(type) { case *csi.VolumeContentSource_Snapshot: - return status.Error(codes.Unimplemented, "Currently only volume copy from another volume is supported") + return cs.copyFromSnapshot(ctx, req, vol) case *csi.VolumeContentSource_Volume: return cs.copyFromVolume(ctx, req, vol) default: @@ -377,6 +566,40 @@ func (cs *ControllerServer) copyVolume(ctx context.Context, req *csi.CreateVolum } } +// newNFSSnapshot Convert VolumeSnapshot parameters to a nfsSnapshot +func newNFSSnapshot(name string, params map[string]string, vol *nfsVolume) (*nfsSnapshot, error) { + server := vol.server + baseDir := vol.baseDir + for k, v := range params { + switch strings.ToLower(k) { + case paramServer: + server = v + case paramShare: + baseDir = v + } + } + + if server == "" { + return nil, fmt.Errorf("%v is a required parameter", paramServer) + } + snapshot := &nfsSnapshot{ + server: server, + baseDir: baseDir, + uuid: name, + } + if vol.subDir != "" { + snapshot.src = vol.subDir + } + if vol.uuid != "" { + snapshot.src = vol.uuid + } + if snapshot.src == "" { + return nil, fmt.Errorf("missing required source volume name") + } + snapshot.id = getSnapshotIDFromNfsSnapshot(snapshot) + return snapshot, nil +} + // newNFSVolume Convert VolumeCreate parameters to an nfsVolume func newNFSVolume(name string, size int64, params map[string]string, defaultOnDeletePolicy string) (*nfsVolume, error) { var server, baseDir, subDir, onDelete string @@ -470,6 +693,17 @@ func getVolumeIDFromNfsVol(vol *nfsVolume) string { return strings.Join(idElements, separator) } +// Given a nfsSnapshot, return a CSI snapshot id. +func getSnapshotIDFromNfsSnapshot(snap *nfsSnapshot) string { + idElements := make([]string, totalIDSnapElements) + idElements[idSnapServer] = strings.Trim(snap.server, "/") + idElements[idSnapBaseDir] = strings.Trim(snap.baseDir, "/") + idElements[idSnapUUID] = snap.uuid + idElements[idSnapArchivePath] = snap.uuid + idElements[idSnapArchiveName] = snap.src + return strings.Join(idElements, separator) +} + // Given a CSI volume id, return a nfsVolume // sample volume Id: // @@ -513,6 +747,25 @@ func getNfsVolFromID(id string) (*nfsVolume, error) { }, nil } +// Given a CSI snapshot ID, return a nfsSnapshot +// sample snapshot ID: +// +// nfs-server.default.svc.cluster.local#share#snapshot-016f784f-56f4-44d1-9041-5f59e82dbce1#snapshot-016f784f-56f4-44d1-9041-5f59e82dbce1#pvc-4bcbf944-b6f7-4bd0-b50f-3c3dd00efc64 +func getNfsSnapFromID(id string) (*nfsSnapshot, error) { + segments := strings.Split(id, separator) + if len(segments) == totalIDSnapElements { + return &nfsSnapshot{ + id: id, + server: segments[idSnapServer], + baseDir: segments[idSnapBaseDir], + src: segments[idSnapArchiveName], + uuid: segments[idSnapUUID], + }, nil + } + + return &nfsSnapshot{}, fmt.Errorf("failed to create nfsSnapshot from snapshot ID") +} + // isValidVolumeCapabilities validates the given VolumeCapability array is valid func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error { if len(volCaps) == 0 { @@ -525,3 +778,32 @@ func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error { } return nil } + +// Validate snapshot after internal mount +func validateSnapshot(snapInternalVolPath string, snap *nfsSnapshot) error { + return filepath.WalkDir(snapInternalVolPath, func(path string, d fs.DirEntry, err error) error { + if path == snapInternalVolPath { + // skip root + return nil + } + if err != nil { + return err + } + if d.Name() != snap.archiveName() { + // there should be just one archive in the snapshot path and archive name should match + return status.Errorf(codes.AlreadyExists, "snapshot with the same name but different source volume ID already exists: found %q, desired %q", d.Name(), snap.archiveName()) + } + return nil + }) +} + +// Volume for snapshot internal mount/unmount +func volumeFromSnapshot(snap *nfsSnapshot) *nfsVolume { + return &nfsVolume{ + id: snap.id, + server: snap.server, + baseDir: snap.baseDir, + subDir: snap.baseDir, + uuid: snap.uuid, + } +} diff --git a/pkg/nfs/nfs.go b/pkg/nfs/nfs.go index 8230bbf2..e312f05c 100644 --- a/pkg/nfs/nfs.go +++ b/pkg/nfs/nfs.go @@ -88,6 +88,7 @@ func NewDriver(options *DriverOptions) *Driver { csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER, csi.ControllerServiceCapability_RPC_CLONE_VOLUME, + csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, }) n.AddNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{