feat: implement volume snapshots
This commit is contained in:
parent
aeb0a8d410
commit
ce66bf8a85
@ -18,6 +18,7 @@ package nfs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
@ -29,6 +30,7 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@ -59,6 +61,35 @@ type nfsVolume struct {
|
||||
onDelete string
|
||||
}
|
||||
|
||||
// nfsSnapshot is an internal representation of a volume snapshot
|
||||
// created by the provisioner.
|
||||
type nfsSnapshot struct {
|
||||
// Snapshot id.
|
||||
id string
|
||||
// Address of the NFS server.
|
||||
// Matches paramServer.
|
||||
server string
|
||||
// Base directory of the NFS server to create snapshots under
|
||||
// Matches paramShare.
|
||||
baseDir string
|
||||
// Snapshot name.
|
||||
uuid string
|
||||
// Source volume.
|
||||
src string
|
||||
}
|
||||
|
||||
func (snap nfsSnapshot) archiveSubPath() string {
|
||||
return snap.uuid
|
||||
}
|
||||
|
||||
func (snap nfsSnapshot) archiveName() string {
|
||||
return fmt.Sprintf("%v.tar.gz", snap.src)
|
||||
}
|
||||
|
||||
func (snap nfsSnapshot) archivePath() string {
|
||||
return filepath.Join(snap.archiveSubPath(), snap.archiveName())
|
||||
}
|
||||
|
||||
// Ordering of elements in the CSI volume id.
|
||||
// ID is of the form {server}/{baseDir}/{subDir}.
|
||||
// TODO: This volume id format limits baseDir and
|
||||
@ -74,6 +105,19 @@ const (
|
||||
totalIDElements // Always last
|
||||
)
|
||||
|
||||
// Ordering of elements in the CSI snapshot id.
|
||||
// ID is of the form {server}/{baseDir}/{snapName}/{srcVolumeName}.
|
||||
// Adding a new element should always go at the end
|
||||
// before totalSnapIDElements
|
||||
const (
|
||||
idSnapServer = iota
|
||||
idSnapBaseDir
|
||||
idSnapUUID
|
||||
idSnapArchivePath
|
||||
idSnapArchiveName
|
||||
totalIDSnapElements // Always last
|
||||
)
|
||||
|
||||
// CreateVolume create a volume
|
||||
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
||||
name := req.GetName()
|
||||
@ -263,11 +307,115 @@ func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "")
|
||||
if len(req.GetName()) == 0 {
|
||||
return nil, status.Error(codes.InvalidArgument, "CreateSnapshot name must be provided")
|
||||
}
|
||||
if len(req.GetSourceVolumeId()) == 0 {
|
||||
return nil, status.Error(codes.InvalidArgument, "CreateSnapshot source volume ID must be provided")
|
||||
}
|
||||
|
||||
srcVol, err := getNfsVolFromID(req.GetSourceVolumeId())
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.NotFound, "failed to create source volume: %v", err)
|
||||
}
|
||||
snapshot, err := newNFSSnapshot(req.GetName(), req.GetParameters(), srcVol)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.NotFound, "failed to create nfsSnapshot: %v", err)
|
||||
}
|
||||
snapVol := volumeFromSnapshot(snapshot)
|
||||
if err = cs.internalMount(ctx, snapVol, nil, nil); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed to mount snapshot nfs server: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err = cs.internalUnmount(ctx, snapVol); err != nil {
|
||||
klog.Warningf("failed to unmount snapshot nfs server: %v", err)
|
||||
}
|
||||
}()
|
||||
snapInternalVolPath := filepath.Join(getInternalVolumePath(cs.Driver.workingMountDir, snapVol), snapshot.archiveSubPath())
|
||||
if err = os.MkdirAll(snapInternalVolPath, 0777); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed to make subdirectory: %v", err)
|
||||
}
|
||||
if err := validateSnapshot(snapInternalVolPath, snapshot); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = cs.internalMount(ctx, srcVol, nil, nil); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed to mount src nfs server: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err = cs.internalUnmount(ctx, srcVol); err != nil {
|
||||
klog.Warningf("failed to unmount src nfs server: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
srcPath := getInternalVolumePath(cs.Driver.workingMountDir, srcVol)
|
||||
dstPath := filepath.Join(snapInternalVolPath, snapshot.archiveName())
|
||||
klog.V(2).Infof("archiving %v -> %v", srcPath, dstPath)
|
||||
out, err := exec.Command("tar", "-C", srcPath, "-czvf", dstPath, ".").CombinedOutput()
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed to create archive for snapshot: %v: %v", err, string(out))
|
||||
}
|
||||
klog.V(2).Infof("archived %s -> %s", srcPath, dstPath)
|
||||
|
||||
var snapshotSize int64
|
||||
fi, err := os.Stat(dstPath)
|
||||
if err != nil {
|
||||
klog.Warningf("failed to determine snapshot size: %v", err)
|
||||
} else {
|
||||
snapshotSize = fi.Size()
|
||||
}
|
||||
return &csi.CreateSnapshotResponse{
|
||||
Snapshot: &csi.Snapshot{
|
||||
SnapshotId: snapshot.id,
|
||||
SourceVolumeId: srcVol.id,
|
||||
SizeBytes: snapshotSize,
|
||||
CreationTime: timestamppb.Now(),
|
||||
ReadyToUse: true,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "")
|
||||
if len(req.GetSnapshotId()) == 0 {
|
||||
return nil, status.Error(codes.InvalidArgument, "Snapshot ID is required for deletion")
|
||||
}
|
||||
snap, err := getNfsSnapFromID(req.GetSnapshotId())
|
||||
if err != nil {
|
||||
// An invalid ID should be treated as doesn't exist
|
||||
klog.Warningf("failed to get nfs snapshot for id %v deletion: %v", req.GetSnapshotId(), err)
|
||||
return &csi.DeleteSnapshotResponse{}, nil
|
||||
}
|
||||
|
||||
var volCap *csi.VolumeCapability
|
||||
mountOptions := getMountOptions(req.GetSecrets())
|
||||
if mountOptions != "" {
|
||||
klog.V(2).Infof("DeleteSnapshot: found mountOptions(%s) for snapshot(%s)", mountOptions, req.GetSnapshotId())
|
||||
volCap = &csi.VolumeCapability{
|
||||
AccessType: &csi.VolumeCapability_Mount{
|
||||
Mount: &csi.VolumeCapability_MountVolume{
|
||||
MountFlags: []string{mountOptions},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
vol := volumeFromSnapshot(snap)
|
||||
if err = cs.internalMount(ctx, vol, nil, volCap); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed to mount nfs server for snapshot deletion: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err = cs.internalUnmount(ctx, vol); err != nil {
|
||||
klog.Warningf("failed to unmount nfs server after snapshot deletion: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// delete snapshot archive
|
||||
internalVolumePath := filepath.Join(getInternalVolumePath(cs.Driver.workingMountDir, vol), snap.archiveSubPath())
|
||||
klog.V(2).Infof("Removing snapshot archive at %v", internalVolumePath)
|
||||
if err = os.RemoveAll(internalVolumePath); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed to delete subdirectory: %v", err.Error())
|
||||
}
|
||||
|
||||
return &csi.DeleteSnapshotResponse{}, nil
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
|
||||
@ -325,6 +473,47 @@ func (cs *ControllerServer) internalUnmount(ctx context.Context, vol *nfsVolume)
|
||||
return err
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) copyFromSnapshot(ctx context.Context, req *csi.CreateVolumeRequest, dstVol *nfsVolume) error {
|
||||
snap, err := getNfsSnapFromID(req.VolumeContentSource.GetSnapshot().GetSnapshotId())
|
||||
if err != nil {
|
||||
return status.Error(codes.NotFound, err.Error())
|
||||
}
|
||||
snapVol := volumeFromSnapshot(snap)
|
||||
|
||||
var volCap *csi.VolumeCapability
|
||||
if len(req.GetVolumeCapabilities()) > 0 {
|
||||
volCap = req.GetVolumeCapabilities()[0]
|
||||
}
|
||||
|
||||
if err = cs.internalMount(ctx, snapVol, nil, volCap); err != nil {
|
||||
return status.Errorf(codes.Internal, "failed to mount src nfs server for snapshot volume copy: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err = cs.internalUnmount(ctx, snapVol); err != nil {
|
||||
klog.Warningf("failed to unmount src nfs server after snapshot volume copy: %v", err)
|
||||
}
|
||||
}()
|
||||
if err = cs.internalMount(ctx, dstVol, nil, volCap); err != nil {
|
||||
return status.Errorf(codes.Internal, "failed to mount dst nfs server for snapshot volume copy: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err = cs.internalUnmount(ctx, dstVol); err != nil {
|
||||
klog.Warningf("failed to unmount dst nfs server after snapshot volume copy: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// untar snapshot archive to dst path
|
||||
snapPath := filepath.Join(getInternalVolumePath(cs.Driver.workingMountDir, snapVol), snap.archivePath())
|
||||
dstPath := getInternalVolumePath(cs.Driver.workingMountDir, dstVol)
|
||||
klog.V(2).Infof("copy volume from snapshot %v -> %v", snapPath, dstPath)
|
||||
out, err := exec.Command("tar", "-xzvf", snapPath, "-C", dstPath).CombinedOutput()
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, "failed to copy volume for snapshot: %v: %v", err, string(out))
|
||||
}
|
||||
klog.V(2).Infof("volume copied from snapshot %v -> %v", snapPath, dstPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) copyFromVolume(ctx context.Context, req *csi.CreateVolumeRequest, dstVol *nfsVolume) error {
|
||||
srcVol, err := getNfsVolFromID(req.GetVolumeContentSource().GetVolume().GetVolumeId())
|
||||
if err != nil {
|
||||
@ -340,26 +529,26 @@ func (cs *ControllerServer) copyFromVolume(ctx context.Context, req *csi.CreateV
|
||||
volCap = req.GetVolumeCapabilities()[0]
|
||||
}
|
||||
if err = cs.internalMount(ctx, srcVol, nil, volCap); err != nil {
|
||||
return status.Errorf(codes.Internal, "failed to mount src nfs server: %v", err.Error())
|
||||
return status.Errorf(codes.Internal, "failed to mount src nfs server: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err = cs.internalUnmount(ctx, srcVol); err != nil {
|
||||
klog.Warningf("failed to unmount nfs server: %v", err.Error())
|
||||
klog.Warningf("failed to unmount nfs server: %v", err)
|
||||
}
|
||||
}()
|
||||
if err = cs.internalMount(ctx, dstVol, nil, volCap); err != nil {
|
||||
return status.Errorf(codes.Internal, "failed to mount dst nfs server: %v", err.Error())
|
||||
return status.Errorf(codes.Internal, "failed to mount dst nfs server: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err = cs.internalUnmount(ctx, dstVol); err != nil {
|
||||
klog.Warningf("failed to unmount dst nfs server: %v", err.Error())
|
||||
klog.Warningf("failed to unmount dst nfs server: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// recursive 'cp' with '-a' to handle symlinks
|
||||
out, err := exec.Command("cp", "-a", srcPath, dstPath).CombinedOutput()
|
||||
if err != nil {
|
||||
return status.Error(codes.Internal, fmt.Sprintf("%v: %v", err, string(out)))
|
||||
return status.Errorf(codes.Internal, "failed to copy volume %v: %v", err, string(out))
|
||||
}
|
||||
klog.V(2).Infof("copied %s -> %s", srcPath, dstPath)
|
||||
return nil
|
||||
@ -369,7 +558,7 @@ func (cs *ControllerServer) copyVolume(ctx context.Context, req *csi.CreateVolum
|
||||
vs := req.VolumeContentSource
|
||||
switch vs.Type.(type) {
|
||||
case *csi.VolumeContentSource_Snapshot:
|
||||
return status.Error(codes.Unimplemented, "Currently only volume copy from another volume is supported")
|
||||
return cs.copyFromSnapshot(ctx, req, vol)
|
||||
case *csi.VolumeContentSource_Volume:
|
||||
return cs.copyFromVolume(ctx, req, vol)
|
||||
default:
|
||||
@ -377,6 +566,40 @@ func (cs *ControllerServer) copyVolume(ctx context.Context, req *csi.CreateVolum
|
||||
}
|
||||
}
|
||||
|
||||
// newNFSSnapshot Convert VolumeSnapshot parameters to a nfsSnapshot
|
||||
func newNFSSnapshot(name string, params map[string]string, vol *nfsVolume) (*nfsSnapshot, error) {
|
||||
server := vol.server
|
||||
baseDir := vol.baseDir
|
||||
for k, v := range params {
|
||||
switch strings.ToLower(k) {
|
||||
case paramServer:
|
||||
server = v
|
||||
case paramShare:
|
||||
baseDir = v
|
||||
}
|
||||
}
|
||||
|
||||
if server == "" {
|
||||
return nil, fmt.Errorf("%v is a required parameter", paramServer)
|
||||
}
|
||||
snapshot := &nfsSnapshot{
|
||||
server: server,
|
||||
baseDir: baseDir,
|
||||
uuid: name,
|
||||
}
|
||||
if vol.subDir != "" {
|
||||
snapshot.src = vol.subDir
|
||||
}
|
||||
if vol.uuid != "" {
|
||||
snapshot.src = vol.uuid
|
||||
}
|
||||
if snapshot.src == "" {
|
||||
return nil, fmt.Errorf("missing required source volume name")
|
||||
}
|
||||
snapshot.id = getSnapshotIDFromNfsSnapshot(snapshot)
|
||||
return snapshot, nil
|
||||
}
|
||||
|
||||
// newNFSVolume Convert VolumeCreate parameters to an nfsVolume
|
||||
func newNFSVolume(name string, size int64, params map[string]string, defaultOnDeletePolicy string) (*nfsVolume, error) {
|
||||
var server, baseDir, subDir, onDelete string
|
||||
@ -470,6 +693,17 @@ func getVolumeIDFromNfsVol(vol *nfsVolume) string {
|
||||
return strings.Join(idElements, separator)
|
||||
}
|
||||
|
||||
// Given a nfsSnapshot, return a CSI snapshot id.
|
||||
func getSnapshotIDFromNfsSnapshot(snap *nfsSnapshot) string {
|
||||
idElements := make([]string, totalIDSnapElements)
|
||||
idElements[idSnapServer] = strings.Trim(snap.server, "/")
|
||||
idElements[idSnapBaseDir] = strings.Trim(snap.baseDir, "/")
|
||||
idElements[idSnapUUID] = snap.uuid
|
||||
idElements[idSnapArchivePath] = snap.uuid
|
||||
idElements[idSnapArchiveName] = snap.src
|
||||
return strings.Join(idElements, separator)
|
||||
}
|
||||
|
||||
// Given a CSI volume id, return a nfsVolume
|
||||
// sample volume Id:
|
||||
//
|
||||
@ -513,6 +747,25 @@ func getNfsVolFromID(id string) (*nfsVolume, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Given a CSI snapshot ID, return a nfsSnapshot
|
||||
// sample snapshot ID:
|
||||
//
|
||||
// nfs-server.default.svc.cluster.local#share#snapshot-016f784f-56f4-44d1-9041-5f59e82dbce1#snapshot-016f784f-56f4-44d1-9041-5f59e82dbce1#pvc-4bcbf944-b6f7-4bd0-b50f-3c3dd00efc64
|
||||
func getNfsSnapFromID(id string) (*nfsSnapshot, error) {
|
||||
segments := strings.Split(id, separator)
|
||||
if len(segments) == totalIDSnapElements {
|
||||
return &nfsSnapshot{
|
||||
id: id,
|
||||
server: segments[idSnapServer],
|
||||
baseDir: segments[idSnapBaseDir],
|
||||
src: segments[idSnapArchiveName],
|
||||
uuid: segments[idSnapUUID],
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &nfsSnapshot{}, fmt.Errorf("failed to create nfsSnapshot from snapshot ID")
|
||||
}
|
||||
|
||||
// isValidVolumeCapabilities validates the given VolumeCapability array is valid
|
||||
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
|
||||
if len(volCaps) == 0 {
|
||||
@ -525,3 +778,32 @@ func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate snapshot after internal mount
|
||||
func validateSnapshot(snapInternalVolPath string, snap *nfsSnapshot) error {
|
||||
return filepath.WalkDir(snapInternalVolPath, func(path string, d fs.DirEntry, err error) error {
|
||||
if path == snapInternalVolPath {
|
||||
// skip root
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if d.Name() != snap.archiveName() {
|
||||
// there should be just one archive in the snapshot path and archive name should match
|
||||
return status.Errorf(codes.AlreadyExists, "snapshot with the same name but different source volume ID already exists: found %q, desired %q", d.Name(), snap.archiveName())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Volume for snapshot internal mount/unmount
|
||||
func volumeFromSnapshot(snap *nfsSnapshot) *nfsVolume {
|
||||
return &nfsVolume{
|
||||
id: snap.id,
|
||||
server: snap.server,
|
||||
baseDir: snap.baseDir,
|
||||
subDir: snap.baseDir,
|
||||
uuid: snap.uuid,
|
||||
}
|
||||
}
|
||||
|
||||
@ -88,6 +88,7 @@ func NewDriver(options *DriverOptions) *Driver {
|
||||
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
|
||||
csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
|
||||
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
|
||||
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
|
||||
})
|
||||
|
||||
n.AddNodeServiceCapabilities([]csi.NodeServiceCapability_RPC_Type{
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user