fix: add locks for nodeserver publish/unpublish operations

This commit is contained in:
andyzhangx 2023-12-23 13:09:37 +00:00
parent aac860cd88
commit 37ab622a4d
3 changed files with 46 additions and 4 deletions

View File

@ -54,6 +54,13 @@ func (ns *NodeServer) NodePublishVolume(_ context.Context, req *csi.NodePublishV
if len(targetPath) == 0 { if len(targetPath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path not provided") return nil, status.Error(codes.InvalidArgument, "Target path not provided")
} }
lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired {
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
}
defer ns.Driver.volumeLocks.Release(lockKey)
mountOptions := volCap.GetMount().GetMountFlags() mountOptions := volCap.GetMount().GetMountFlags()
if req.GetReadonly() { if req.GetReadonly() {
mountOptions = append(mountOptions, "ro") mountOptions = append(mountOptions, "ro")
@ -156,6 +163,12 @@ func (ns *NodeServer) NodeUnpublishVolume(_ context.Context, req *csi.NodeUnpubl
return nil, status.Error(codes.InvalidArgument, "Target path missing in request") return nil, status.Error(codes.InvalidArgument, "Target path missing in request")
} }
lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
if acquired := ns.Driver.volumeLocks.TryAcquire(lockKey); !acquired {
return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volumeID)
}
defer ns.Driver.volumeLocks.Release(lockKey)
klog.V(2).Infof("NodeUnpublishVolume: unmounting volume %s on %s", volumeID, targetPath) klog.V(2).Infof("NodeUnpublishVolume: unmounting volume %s on %s", volumeID, targetPath)
var err error var err error
extensiveMountPointCheck := true extensiveMountPointCheck := true

View File

@ -19,6 +19,7 @@ package nfs
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"os" "os"
"reflect" "reflect"
"strings" "strings"
@ -68,6 +69,7 @@ func TestNodePublishVolume(t *testing.T) {
volumeCap := csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER} volumeCap := csi.VolumeCapability_AccessMode{Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER}
alreadyMountedTarget := testutil.GetWorkDirPath("false_is_likely_exist_target", t) alreadyMountedTarget := testutil.GetWorkDirPath("false_is_likely_exist_target", t)
targetTest := testutil.GetWorkDirPath("target_test", t) targetTest := testutil.GetWorkDirPath("target_test", t)
lockKey := fmt.Sprintf("%s-%s", "vol_1", targetTest)
tests := []struct { tests := []struct {
desc string desc string
@ -93,6 +95,20 @@ func TestNodePublishVolume(t *testing.T) {
VolumeId: "vol_1"}, VolumeId: "vol_1"},
expectedErr: status.Error(codes.InvalidArgument, "Target path not provided"), expectedErr: status.Error(codes.InvalidArgument, "Target path not provided"),
}, },
{
desc: "[Error] Volume operation in progress",
setup: func() {
ns.Driver.volumeLocks.TryAcquire(lockKey)
},
req: csi.NodePublishVolumeRequest{VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap},
VolumeId: "vol_1",
VolumeContext: params,
TargetPath: targetTest},
expectedErr: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")),
cleanup: func() {
ns.Driver.volumeLocks.Release(lockKey)
},
},
{ {
desc: "[Success] Stage target path missing", desc: "[Success] Stage target path missing",
req: csi.NodePublishVolumeRequest{ req: csi.NodePublishVolumeRequest{
@ -198,6 +214,7 @@ func TestNodeUnpublishVolume(t *testing.T) {
errorTarget := testutil.GetWorkDirPath("error_is_likely_target", t) errorTarget := testutil.GetWorkDirPath("error_is_likely_target", t)
targetTest := testutil.GetWorkDirPath("target_test", t) targetTest := testutil.GetWorkDirPath("target_test", t)
targetFile := testutil.GetWorkDirPath("abc.go", t) targetFile := testutil.GetWorkDirPath("abc.go", t)
lockKey := fmt.Sprintf("%s-%s", "vol_1", targetTest)
tests := []struct { tests := []struct {
desc string desc string
@ -220,6 +237,17 @@ func TestNodeUnpublishVolume(t *testing.T) {
desc: "[Success] Volume not mounted", desc: "[Success] Volume not mounted",
req: csi.NodeUnpublishVolumeRequest{TargetPath: targetFile, VolumeId: "vol_1"}, req: csi.NodeUnpublishVolumeRequest{TargetPath: targetFile, VolumeId: "vol_1"},
}, },
{
desc: "[Error] Volume operation in progress",
setup: func() {
ns.Driver.volumeLocks.TryAcquire(lockKey)
},
req: csi.NodeUnpublishVolumeRequest{TargetPath: targetTest, VolumeId: "vol_1"},
expectedErr: status.Error(codes.Aborted, fmt.Sprintf(volumeOperationAlreadyExistsFmt, "vol_1")),
cleanup: func() {
ns.Driver.volumeLocks.Release(lockKey)
},
},
} }
// Setup // Setup

View File

@ -34,10 +34,11 @@ import (
//nolint:revive //nolint:revive
const ( const (
separator = "#" separator = "#"
delete = "delete" delete = "delete"
retain = "retain" retain = "retain"
archive = "archive" archive = "archive"
volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists"
) )
var supportedOnDeleteValues = []string{"", delete, retain, archive} var supportedOnDeleteValues = []string{"", delete, retain, archive}