diff --git a/pkg/nfs/nodeserver.go b/pkg/nfs/nodeserver.go index 86559dbf..f9f5fc90 100644 --- a/pkg/nfs/nodeserver.go +++ b/pkg/nfs/nodeserver.go @@ -54,6 +54,13 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV if len(targetPath) == 0 { return nil, status.Error(codes.InvalidArgument, "Target path not provided") } + + lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath) + if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired { + return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID) + } + defer ns.Driver.volumeLocks.Release(lockKey) + mountOptions := volCap.GetMount().GetMountFlags() if req.GetReadonly() { mountOptions = append(mountOptions, "ro") @@ -156,6 +163,12 @@ func (ns *NodeServer) NodeUnpublishVolume(_ context.Context, req *csi.NodeUnpubl return nil, status.Error(codes.InvalidArgument, "Target path missing in request") } + lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath) + if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired { + return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID) + } + defer ns.Driver.volumeLocks.Release(lockKey) + klog.V(2).Infof("NodeUnpublishVolume: unmounting volume %s on %s", volumeID, targetPath) var err error extensiveMountPointCheck := true diff --git a/pkg/nfs/nodeserver_test.go b/pkg/nfs/nodeserver_test.go index dccb05f2..2d8bb9ca 100644 --- a/pkg/nfs/nodeserver_test.go +++ b/pkg/nfs/nodeserver_test.go @@ -19,6 +19,7 @@ package nfs import ( "context" "errors" + "fmt" "os" "reflect" "strings" @@ -68,6 +69,7 @@ func TestNodePublishVolume(t *testing.T) { volumeCap := csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER} alreadyMountedTarget := testutil.GetWorkDirPath("false_is_likely_exist_target", t) targetTest := testutil.GetWorkDirPath("target_test", t) + lockKey := fmt.Sprintf("%s-%s", "vol_1", targetTest) tests := []struct { desc string @@ -93,6 +95,20 @@ func TestNodePublishVolume(t *testing.T) { VolumeId: "vol_1"}, expectedErr: status.Error(codes.InvalidArgument, "Target path not provided"), }, + { + desc: "[Error] Volume operation in progress", + setup: func() { + ns.Driver.volumeLocks.TryAcquire(lockKey) + }, + req: csi.NodePublishVolumeRequest{VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap}, + VolumeId: "vol_1", + VolumeContext: params, + TargetPath: targetTest}, + expectedErr: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")), + cleanup: func() { + ns.Driver.volumeLocks.Release(lockKey) + }, + }, { desc: "[Success] Stage target path missing", req: csi.NodePublishVolumeRequest{ @@ -198,6 +214,7 @@ func TestNodeUnpublishVolume(t *testing.T) { errorTarget := testutil.GetWorkDirPath("error_is_likely_target", t) targetTest := testutil.GetWorkDirPath("target_test", t) targetFile := testutil.GetWorkDirPath("abc.go", t) + lockKey := fmt.Sprintf("%s-%s", "vol_1", targetTest) tests := []struct { desc string @@ -220,6 +237,17 @@ func TestNodeUnpublishVolume(t *testing.T) { desc: "[Success] Volume not mounted", req: csi.NodeUnpublishVolumeRequest{TargetPath: targetFile, VolumeId: "vol_1"}, }, + { + desc: "[Error] Volume operation in progress", + setup: func() { + ns.Driver.volumeLocks.TryAcquire(lockKey) + }, + req: csi.NodeUnpublishVolumeRequest{TargetPath: targetTest, VolumeId: "vol_1"}, + expectedErr: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")), + cleanup: func() { + ns.Driver.volumeLocks.Release(lockKey) + }, + }, } // Setup diff --git a/pkg/nfs/utils.go b/pkg/nfs/utils.go index a8920fd7..f71605d2 100644 --- a/pkg/nfs/utils.go +++ b/pkg/nfs/utils.go @@ -34,10 +34,11 @@ import ( //nolint:revive const ( - separator = "#" - delete = "delete" - retain = "retain" - archive = "archive" + separator = "#" + delete = "delete" + retain = "retain" + archive = "archive" + volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists" ) var supportedOnDeleteValues = []string{"", delete, retain, archive}