Merge pull request #574 from andyzhangx/mount-lock
fix: add locks for nodeserver publish/unpublish operations
This commit is contained in:
commit
b5dcdf7924
@ -54,6 +54,13 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV
|
||||
if len(targetPath) == 0 {
|
||||
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
|
||||
}
|
||||
|
||||
lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
|
||||
if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired {
|
||||
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
|
||||
}
|
||||
defer ns.Driver.volumeLocks.Release(lockKey)
|
||||
|
||||
mountOptions := volCap.GetMount().GetMountFlags()
|
||||
if req.GetReadonly() {
|
||||
mountOptions = append(mountOptions, "ro")
|
||||
@ -156,6 +163,12 @@ func (ns *NodeServer) NodeUnpublishVolume(_ context.Context, req *csi.NodeUnpubl
|
||||
return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
|
||||
}
|
||||
|
||||
lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
|
||||
if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired {
|
||||
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
|
||||
}
|
||||
defer ns.Driver.volumeLocks.Release(lockKey)
|
||||
|
||||
klog.V(2).Infof("NodeUnpublishVolume: unmounting volume %s on %s", volumeID, targetPath)
|
||||
var err error
|
||||
extensiveMountPointCheck := true
|
||||
|
||||
@ -19,6 +19,7 @@ package nfs
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
@ -68,6 +69,7 @@ func TestNodePublishVolume(t *testing.T) {
|
||||
volumeCap := csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER}
|
||||
alreadyMountedTarget := testutil.GetWorkDirPath("false_is_likely_exist_target", t)
|
||||
targetTest := testutil.GetWorkDirPath("target_test", t)
|
||||
lockKey := fmt.Sprintf("%s-%s", "vol_1", targetTest)
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
@ -93,6 +95,20 @@ func TestNodePublishVolume(t *testing.T) {
|
||||
VolumeId: "vol_1"},
|
||||
expectedErr: status.Error(codes.InvalidArgument, "Target path not provided"),
|
||||
},
|
||||
{
|
||||
desc: "[Error] Volume operation in progress",
|
||||
setup: func() {
|
||||
ns.Driver.volumeLocks.TryAcquire(lockKey)
|
||||
},
|
||||
req: csi.NodePublishVolumeRequest{VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap},
|
||||
VolumeId: "vol_1",
|
||||
VolumeContext: params,
|
||||
TargetPath: targetTest},
|
||||
expectedErr: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")),
|
||||
cleanup: func() {
|
||||
ns.Driver.volumeLocks.Release(lockKey)
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "[Success] Stage target path missing",
|
||||
req: csi.NodePublishVolumeRequest{
|
||||
@ -198,6 +214,7 @@ func TestNodeUnpublishVolume(t *testing.T) {
|
||||
errorTarget := testutil.GetWorkDirPath("error_is_likely_target", t)
|
||||
targetTest := testutil.GetWorkDirPath("target_test", t)
|
||||
targetFile := testutil.GetWorkDirPath("abc.go", t)
|
||||
lockKey := fmt.Sprintf("%s-%s", "vol_1", targetTest)
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
@ -220,6 +237,17 @@ func TestNodeUnpublishVolume(t *testing.T) {
|
||||
desc: "[Success] Volume not mounted",
|
||||
req: csi.NodeUnpublishVolumeRequest{TargetPath: targetFile, VolumeId: "vol_1"},
|
||||
},
|
||||
{
|
||||
desc: "[Error] Volume operation in progress",
|
||||
setup: func() {
|
||||
ns.Driver.volumeLocks.TryAcquire(lockKey)
|
||||
},
|
||||
req: csi.NodeUnpublishVolumeRequest{TargetPath: targetTest, VolumeId: "vol_1"},
|
||||
expectedErr: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")),
|
||||
cleanup: func() {
|
||||
ns.Driver.volumeLocks.Release(lockKey)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Setup
|
||||
|
||||
@ -34,10 +34,11 @@ import (
|
||||
|
||||
//nolint:revive
|
||||
const (
|
||||
separator = "#"
|
||||
delete = "delete"
|
||||
retain = "retain"
|
||||
archive = "archive"
|
||||
separator = "#"
|
||||
delete = "delete"
|
||||
retain = "retain"
|
||||
archive = "archive"
|
||||
volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
|
||||
)
|
||||
|
||||
var supportedOnDeleteValues = []string{"", delete, retain, archive}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user