Merge pull request #53 from mayankshah1607/mayank/create-volume

feat: implement `CreateVolume`
This commit is contained in:
Kubernetes Prow Robot 2020-10-18 07:48:13 -07:00 committed by GitHub
commit 108aef3ed2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 531 additions and 14 deletions

View File

@ -11,9 +11,6 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Install nfs-common
run: |
sudo apt-get install -y nfs-common
- name: Run tests
run: |
make sanity-test
sudo make sanity-test

View File

@ -70,6 +70,7 @@ spec:
allowPrivilegeEscalation: true
imagePullPolicy: IfNotPresent
args:
- "-v=5"
- "--nodeid=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)"
env:

View File

@ -45,6 +45,7 @@ spec:
allowPrivilegeEscalation: true
image: quay.io/k8scsi/nfsplugin:v2.0.0
args :
- "-v=5"
- "--nodeid=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)"
env:

View File

@ -0,0 +1,42 @@
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: statefulset-nfs
labels:
app: nginx
spec:
serviceName: statefulset-nfs
replicas: 1
template:
metadata:
labels:
app: nginx
spec:
nodeSelector:
"kubernetes.io/os": linux
containers:
- name: statefulset-nfs
image: mcr.microsoft.com/oss/nginx/nginx:1.17.3-alpine
command:
- "/bin/sh"
- "-c"
- while true; do echo $(date) >> /mnt/nfs/outfile; sleep 1; done
volumeMounts:
- name: persistent-storage
mountPath: /mnt/nfs
updateStrategy:
type: RollingUpdate
selector:
matchLabels:
app: nginx
volumeClaimTemplates:
- metadata:
name: persistent-storage
annotations:
volume.beta.kubernetes.io/storage-class: nfs-csi
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 10Gi

View File

@ -0,0 +1,14 @@
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: nfs-csi
provisioner: nfs.csi.k8s.io
parameters:
server: nfs-server.default.svc.cluster.local
share: /
reclaimPolicy: Retain # only retain is supported
volumeBindingMode: Immediate
mountOptions:
- hard
- nfsvers=4.1

View File

@ -1,6 +1,11 @@
package nfs
import (
"fmt"
"os"
"path/filepath"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"golang.org/x/net/context"
@ -10,14 +15,126 @@ import (
type ControllerServer struct {
Driver *nfsDriver
// Working directory for the provisioner to temporarily mount nfs shares at
workingMountDir string
}
// nfsVolume is an internal representation of a volume
// created by the provisioner.
type nfsVolume struct {
// Volume id
id string
// Address of the NFS server.
// Matches paramServer.
server string
// Base directory of the NFS server to create volumes under
// Matches paramShare.
baseDir string
// Subdirectory of the NFS server to create volumes under
subDir string
// size of volume
size int64
}
// Ordering of elements in the CSI volume id.
// ID is of the form {server}/{baseDir}/{subDir}.
// TODO: This volume id format limits baseDir and
// subDir to only be one directory deep.
// Adding a new element should always go at the end
// before totalIDElements
const (
idServer = iota
idBaseDir
idSubDir
totalIDElements // Always last
)
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
// Validate arguments
name := req.GetName()
if len(name) == 0 {
return nil, status.Error(codes.InvalidArgument, "CreateVolume name must be provided")
}
if err := cs.validateVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
reqCapacity := req.GetCapacityRange().GetRequiredBytes()
nfsVol, err := cs.newNFSVolume(name, reqCapacity, req.GetParameters())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
// Mount nfs base share so we can create a subdirectory
if err = cs.internalMount(ctx, nfsVol); err != nil {
return nil, status.Errorf(codes.Internal, "failed to mount nfs server: %v", err.Error())
}
defer func() {
if err = cs.internalUnmount(ctx, nfsVol); err != nil {
glog.Warningf("failed to unmount nfs server: %v", err.Error())
}
}()
// Create subdirectory under base-dir
// TODO: revisit permissions
internalVolumePath := cs.getInternalVolumePath(nfsVol)
if err = os.Mkdir(internalVolumePath, 0777); err != nil {
return nil, status.Errorf(codes.Internal, "failed to make subdirectory: %v", err.Error())
}
// Remove capacity setting when provisioner 1.4.0 is available with fix for
// https://github.com/kubernetes-csi/external-provisioner/pull/271
return &csi.CreateVolumeResponse{Volume: cs.nfsVolToCSI(nfsVol, reqCapacity)}, nil
}
func validCapacity(requested int64, volumePath string) error {
metrics, err := getVolumeMetrics(volumePath)
if err != nil {
return err
}
capacity, ok := metrics.Capacity.AsInt64()
if !ok {
return status.Errorf(codes.Internal, "failed to get capacity")
}
if capacity != requested {
return status.Errorf(codes.AlreadyExists, "volume at this path exists with a different capacity")
}
return nil
}
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
volumeId := req.GetVolumeId()
if volumeId == "" {
return nil, status.Error(codes.InvalidArgument, "volume id is empty")
}
nfsVol, err := cs.getNfsVolFromId(volumeId)
if err != nil {
// An invalid ID should be treated as doesn't exist
glog.V(5).Infof("failed to get nfs volume for volume id %v deletion: %v", volumeId, err)
return &csi.DeleteVolumeResponse{}, nil
}
// Mount nfs base share so we can delete the subdirectory
if err = cs.internalMount(ctx, nfsVol); err != nil {
return nil, status.Errorf(codes.Internal, "failed to mount nfs server: %v", err.Error())
}
defer func() {
if err = cs.internalUnmount(ctx, nfsVol); err != nil {
glog.Warningf("failed to unmount nfs server: %v", err.Error())
}
}()
// Delete subdirectory under base-dir
internalVolumePath := cs.getInternalVolumePath(nfsVol)
glog.V(4).Infof("Removing subdirectory at %v", internalVolumePath)
if err = os.RemoveAll(internalVolumePath); err != nil {
return nil, status.Errorf(codes.Internal, "failed to delete subdirectory: %v", err.Error())
}
return &csi.DeleteVolumeResponse{}, nil
}
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
@ -73,3 +190,173 @@ func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (cs *ControllerServer) validateVolumeCapabilities(caps []*csi.VolumeCapability) error {
if len(caps) == 0 {
return fmt.Errorf("volume capabilities must be provided")
}
for _, c := range caps {
if err := cs.validateVolumeCapability(c); err != nil {
return err
}
}
return nil
}
func (cs *ControllerServer) validateVolumeCapability(c *csi.VolumeCapability) error {
if c == nil {
return fmt.Errorf("volume capability must be provided")
}
// Validate access mode
accessMode := c.GetAccessMode()
if accessMode == nil {
return fmt.Errorf("volume capability access mode not set")
}
if !cs.Driver.cap[accessMode.Mode] {
return fmt.Errorf("driver does not support access mode: %v", accessMode.Mode.String())
}
// Validate access type
accessType := c.GetAccessType()
if accessType == nil {
return fmt.Errorf("volume capability access type not set")
}
return nil
}
// Mount nfs server at base-dir
func (cs *ControllerServer) internalMount(ctx context.Context, vol *nfsVolume) error {
sharePath := filepath.Join(string(filepath.Separator) + vol.baseDir)
targetPath := cs.getInternalMountPath(vol)
stdVolCap := csi.VolumeCapability{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
}
glog.V(4).Infof("internally mounting %v:%v at %v", vol.server, sharePath, targetPath)
_, err := cs.Driver.ns.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
TargetPath: targetPath,
VolumeContext: map[string]string{
paramServer: vol.server,
paramShare: sharePath,
},
VolumeCapability: &stdVolCap,
VolumeId: vol.id,
})
return err
}
// Unmount nfs server at base-dir
func (cs *ControllerServer) internalUnmount(ctx context.Context, vol *nfsVolume) error {
targetPath := cs.getInternalMountPath(vol)
// Unmount nfs server at base-dir
glog.V(4).Infof("internally unmounting %v", targetPath)
_, err := cs.Driver.ns.NodeUnpublishVolume(ctx, &csi.NodeUnpublishVolumeRequest{
TargetPath: cs.getInternalMountPath(vol),
})
return err
}
// Convert VolumeCreate parameters to an nfsVolume
func (cs *ControllerServer) newNFSVolume(name string, size int64, params map[string]string) (*nfsVolume, error) {
var (
server string
baseDir string
)
// Validate parameters (case-insensitive).
// TODO do more strict validation.
for k, v := range params {
switch strings.ToLower(k) {
case paramServer:
server = v
case paramShare:
baseDir = v
default:
return nil, fmt.Errorf("invalid parameter %q", k)
}
}
// Validate required parameters
if server == "" {
return nil, fmt.Errorf("%v is a required parameter", paramServer)
}
if baseDir == "" {
return nil, fmt.Errorf("%v is a required parameter", paramShare)
}
vol := &nfsVolume{
server: server,
baseDir: baseDir,
subDir: name,
size: size,
}
vol.id = cs.getVolumeIdFromNfsVol(vol)
return vol, nil
}
// Get working directory for CreateVolume and DeleteVolume
func (cs *ControllerServer) getInternalMountPath(vol *nfsVolume) string {
// use default if empty
if cs.workingMountDir == "" {
cs.workingMountDir = "/tmp"
}
return filepath.Join(cs.workingMountDir, vol.subDir)
}
// Get internal path where the volume is created
// The reason why the internal path is "workingDir/subDir/subDir" is because:
// * the semantic is actually "workingDir/volId/subDir" and volId == subDir.
// * we need a mount directory per volId because you can have multiple
// CreateVolume calls in parallel and they may use the same underlying share.
// Instead of refcounting how many CreateVolume calls are using the same
// share, it's simpler to just do a mount per request.
func (cs *ControllerServer) getInternalVolumePath(vol *nfsVolume) string {
return filepath.Join(cs.getInternalMountPath(vol), vol.subDir)
}
// Get user-visible share path for the volume
func (cs *ControllerServer) getVolumeSharePath(vol *nfsVolume) string {
return filepath.Join(string(filepath.Separator), vol.baseDir, vol.subDir)
}
// Convert into nfsVolume into a csi.Volume
func (cs *ControllerServer) nfsVolToCSI(vol *nfsVolume, reqCapacity int64) *csi.Volume {
return &csi.Volume{
CapacityBytes: reqCapacity,
VolumeId: vol.id,
VolumeContext: map[string]string{
paramServer: vol.server,
paramShare: cs.getVolumeSharePath(vol),
},
}
}
// Given a nfsVolume, return a CSI volume id
func (cs *ControllerServer) getVolumeIdFromNfsVol(vol *nfsVolume) string {
idElements := make([]string, totalIDElements)
idElements[idServer] = vol.server
idElements[idBaseDir] = vol.baseDir
idElements[idSubDir] = vol.subDir
return strings.Join(idElements, "/")
}
// Given a CSI volume id, return a nfsVolume
func (cs *ControllerServer) getNfsVolFromId(id string) (*nfsVolume, error) {
tokens := strings.Split(id, "/")
if len(tokens) != totalIDElements {
return nil, fmt.Errorf("volume id %q unexpected format: got %v tokens", id, len(tokens))
}
return &nfsVolume{
id: id,
server: tokens[idServer],
baseDir: tokens[idBaseDir],
subDir: tokens[idSubDir],
}, nil
}

View File

@ -0,0 +1,159 @@
package nfs
import (
"os"
"path/filepath"
"reflect"
"testing"
"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/net/context"
"k8s.io/utils/mount"
)
const (
testServer = "test-server"
testBaseDir = "test-base-dir"
testCSIVolume = "test-csi"
testVolumeId = "test-server/test-base-dir/test-csi"
)
// for Windows support in the future
var (
testShare = filepath.Join(string(filepath.Separator), testBaseDir, string(filepath.Separator), testCSIVolume)
)
func initTestController(t *testing.T) *ControllerServer {
var perm *uint32
mounter := &mount.FakeMounter{MountPoints: []mount.MountPoint{}}
driver := NewNFSdriver("", "", perm)
driver.ns = NewNodeServer(driver, mounter)
return NewControllerServer(driver)
}
func TestCreateVolume(t *testing.T) {
cases := []struct {
name string
req *csi.CreateVolumeRequest
resp *csi.CreateVolumeResponse
expectErr bool
}{
{
name: "valid defaults",
req: &csi.CreateVolumeRequest{
Name: testCSIVolume,
VolumeCapabilities: []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
},
},
Parameters: map[string]string{
paramServer: testServer,
paramShare: testBaseDir,
},
},
resp: &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: testVolumeId,
VolumeContext: map[string]string{
paramServer: testServer,
paramShare: testShare,
},
},
},
},
{
name: "name empty",
req: &csi.CreateVolumeRequest{
VolumeCapabilities: []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
},
},
Parameters: map[string]string{
paramServer: testServer,
paramShare: testBaseDir,
},
},
expectErr: true,
},
{
name: "invalid volume capability",
req: &csi.CreateVolumeRequest{
Name: testCSIVolume,
VolumeCapabilities: []*csi.VolumeCapability{
{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
},
},
Parameters: map[string]string{
paramServer: testServer,
paramShare: testBaseDir,
},
},
expectErr: true,
},
{
name: "invalid create context",
req: &csi.CreateVolumeRequest{
Name: testCSIVolume,
VolumeCapabilities: []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
},
},
Parameters: map[string]string{
"unknown-parameter": "foo",
},
},
expectErr: true,
},
}
for _, test := range cases {
test := test //pin
t.Run(test.name, func(t *testing.T) {
// Setup
cs := initTestController(t)
// Run
resp, err := cs.CreateVolume(context.TODO(), test.req)
// Verify
if !test.expectErr && err != nil {
t.Errorf("test %q failed: %v", test.name, err)
}
if test.expectErr && err == nil {
t.Errorf("test %q failed; got success", test.name)
}
if !reflect.DeepEqual(resp, test.resp) {
t.Errorf("test %q failed: got resp %+v, expected %+v", test.name, resp, test.resp)
}
if !test.expectErr {
info, err := os.Stat(filepath.Join(cs.workingMountDir, test.req.Name, test.req.Name))
if err != nil {
t.Errorf("test %q failed: couldn't find volume subdirectory: %v", test.name, err)
}
if !info.IsDir() {
t.Errorf("test %q failed: subfile not a directory", test.name)
}
}
})
}
}

View File

@ -39,6 +39,13 @@ type nfsDriver struct {
const (
driverName = "nfs.csi.k8s.io"
// Address of the NFS server
paramServer = "server"
// Base directory of the NFS server to create volumes under.
// The base directory must be a direct child of the root directory.
// The root directory is ommitted from the string, for example:
// "base" instead of "/base"
paramShare = "share"
)
var (
@ -69,8 +76,9 @@ func NewNFSdriver(nodeID, endpoint string, perm *uint32) *nfsDriver {
// NFS plugin does not support ControllerServiceCapability now.
// If support is added, it should set to appropriate
// ControllerServiceCapability RPC types.
n.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_UNKNOWN})
n.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
})
return n
}

View File

@ -27,6 +27,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/utils/mount"
)
@ -69,8 +70,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
mo = append(mo, "ro")
}
s := req.GetVolumeContext()["server"]
ep := req.GetVolumeContext()["share"]
s := req.GetVolumeContext()[paramServer]
ep := req.GetVolumeContext()[paramShare]
source := fmt.Sprintf("%s:%s", s, ep)
err = ns.mounter.Mount(source, targetPath, "nfs", mo)
@ -161,3 +162,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
func (ns *nodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func getVolumeMetrics(volumePath string) (*volume.Metrics, error) {
return volume.NewMetricsStatFS(volumePath).GetMetrics()
}

View File

@ -2,12 +2,13 @@ package nfs
import (
"fmt"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/glog"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"golang.org/x/net/context"
"google.golang.org/grpc"
"strings"
)
func NewDefaultIdentityServer(d *nfsDriver) *IdentityServer {

View File

@ -34,6 +34,8 @@ function install_csi_sanity_bin {
function provision_nfs_server {
echo 'Installing NFS server on localhost'
apt-get update -y
apt-get install -y nfs-common
docker run -d --name nfs --privileged -p 2049:2049 -v $(pwd):/nfsshare -e SHARED_DIRECTORY=/nfsshare itsthenetwork/nfs-server-alpine:latest
}
@ -50,4 +52,4 @@ bin/nfsplugin --endpoint "$endpoint" --nodeid "$nodeid" -v=5 &
echo 'Begin to run sanity test...'
readonly CSI_SANITY_BIN='csi-test/cmd/csi-sanity/csi-sanity'
"$CSI_SANITY_BIN" --ginkgo.v --ginkgo.noColor --csi.testvolumeparameters="$(pwd)/test/sanity/params.yaml" --csi.endpoint="$endpoint" --ginkgo.skip="should not fail when requesting to create a volume with already existing name and same capacity|ValidateVolumeCapabilities|ControllerGetCapabilities|should work"
"$CSI_SANITY_BIN" --ginkgo.v --ginkgo.noColor --csi.testvolumeparameters="$(pwd)/test/sanity/params.yaml" --csi.endpoint="$endpoint" --ginkgo.skip="should not fail when requesting to create a volume with already existing name and same capacity|should fail when requesting to create a volume with already existing name and different capacity|ValidateVolumeCapabilities|ControllerGetCapabilities|should work"