393 lines
13 KiB
Go
393 lines
13 KiB
Go
/*
|
|
Copyright 2020 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package nfs
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"regexp"
|
|
"strings"
|
|
|
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"k8s.io/klog/v2"
|
|
)
|
|
|
|
// ControllerServer controller server setting
|
|
type ControllerServer struct {
|
|
Driver *Driver
|
|
// Working directory for the provisioner to temporarily mount nfs shares at
|
|
workingMountDir string
|
|
}
|
|
|
|
// nfsVolume is an internal representation of a volume
|
|
// created by the provisioner.
|
|
type nfsVolume struct {
|
|
// Volume id
|
|
id string
|
|
// Address of the NFS server.
|
|
// Matches paramServer.
|
|
server string
|
|
// Base directory of the NFS server to create volumes under
|
|
// Matches paramShare.
|
|
baseDir string
|
|
// Subdirectory of the NFS server to create volumes under
|
|
subDir string
|
|
// size of volume
|
|
size int64
|
|
}
|
|
|
|
// Ordering of elements in the CSI volume id.
|
|
// ID is of the form {server}/{baseDir}/{subDir}.
|
|
// TODO: This volume id format limits baseDir and
|
|
// subDir to only be one directory deep.
|
|
// Adding a new element should always go at the end
|
|
// before totalIDElements
|
|
const (
|
|
idServer = iota
|
|
idBaseDir
|
|
idSubDir
|
|
totalIDElements // Always last
|
|
)
|
|
|
|
// CreateVolume create a volume
|
|
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
|
name := req.GetName()
|
|
if len(name) == 0 {
|
|
return nil, status.Error(codes.InvalidArgument, "CreateVolume name must be provided")
|
|
}
|
|
if err := cs.validateVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
|
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
reqCapacity := req.GetCapacityRange().GetRequiredBytes()
|
|
nfsVol, err := cs.newNFSVolume(name, reqCapacity, req.GetParameters())
|
|
if err != nil {
|
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
|
|
var volCap *csi.VolumeCapability
|
|
if len(req.GetVolumeCapabilities()) > 0 {
|
|
volCap = req.GetVolumeCapabilities()[0]
|
|
}
|
|
// Mount nfs base share so we can create a subdirectory
|
|
if err = cs.internalMount(ctx, nfsVol, volCap); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to mount nfs server: %v", err.Error())
|
|
}
|
|
defer func() {
|
|
if err = cs.internalUnmount(ctx, nfsVol); err != nil {
|
|
klog.Warningf("failed to unmount nfs server: %v", err.Error())
|
|
}
|
|
}()
|
|
|
|
fileMode := os.FileMode(0777)
|
|
if cs.Driver.perm != nil {
|
|
fileMode = os.FileMode(*cs.Driver.perm)
|
|
}
|
|
// Create subdirectory under base-dir
|
|
internalVolumePath := cs.getInternalVolumePath(nfsVol)
|
|
if err = os.Mkdir(internalVolumePath, fileMode); err != nil && !os.IsExist(err) {
|
|
return nil, status.Errorf(codes.Internal, "failed to make subdirectory: %v", err.Error())
|
|
}
|
|
// Reset directory permissions because of umask problems
|
|
if err = os.Chmod(internalVolumePath, fileMode); err != nil {
|
|
klog.Warningf("failed to chmod subdirectory: %v", err.Error())
|
|
}
|
|
return &csi.CreateVolumeResponse{Volume: cs.nfsVolToCSI(nfsVol)}, nil
|
|
}
|
|
|
|
// DeleteVolume delete a volume
|
|
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
|
volumeID := req.GetVolumeId()
|
|
if volumeID == "" {
|
|
return nil, status.Error(codes.InvalidArgument, "volume id is empty")
|
|
}
|
|
nfsVol, err := cs.getNfsVolFromID(volumeID)
|
|
if err != nil {
|
|
// An invalid ID should be treated as doesn't exist
|
|
klog.Warningf("failed to get nfs volume for volume id %v deletion: %v", volumeID, err)
|
|
return &csi.DeleteVolumeResponse{}, nil
|
|
}
|
|
|
|
var volCap *csi.VolumeCapability
|
|
mountOptions := getMountOptions(req.GetSecrets())
|
|
if mountOptions != "" {
|
|
klog.V(2).Infof("DeleteVolume: found mountOptions(%s) for volume(%s)", mountOptions, volumeID)
|
|
volCap = &csi.VolumeCapability{
|
|
AccessType: &csi.VolumeCapability_Mount{
|
|
Mount: &csi.VolumeCapability_MountVolume{
|
|
MountFlags: []string{mountOptions},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
// Mount nfs base share so we can delete the subdirectory
|
|
if err = cs.internalMount(ctx, nfsVol, volCap); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to mount nfs server: %v", err.Error())
|
|
}
|
|
defer func() {
|
|
if err = cs.internalUnmount(ctx, nfsVol); err != nil {
|
|
klog.Warningf("failed to unmount nfs server: %v", err.Error())
|
|
}
|
|
}()
|
|
|
|
// Delete subdirectory under base-dir
|
|
internalVolumePath := cs.getInternalVolumePath(nfsVol)
|
|
|
|
klog.V(2).Infof("Removing subdirectory at %v", internalVolumePath)
|
|
if err = os.RemoveAll(internalVolumePath); err != nil {
|
|
return nil, status.Errorf(codes.Internal, "failed to delete subdirectory: %v", err.Error())
|
|
}
|
|
|
|
return &csi.DeleteVolumeResponse{}, nil
|
|
}
|
|
|
|
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
|
|
return nil, status.Error(codes.Unimplemented, "")
|
|
}
|
|
|
|
func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
|
|
return nil, status.Error(codes.Unimplemented, "")
|
|
}
|
|
|
|
func (cs *ControllerServer) ControllerGetVolume(ctx context.Context, req *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
|
|
return nil, status.Error(codes.Unimplemented, "")
|
|
}
|
|
|
|
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
|
|
if len(req.GetVolumeId()) == 0 {
|
|
return nil, status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
|
}
|
|
if req.GetVolumeCapabilities() == nil {
|
|
return nil, status.Error(codes.InvalidArgument, "Volume capabilities missing in request")
|
|
}
|
|
|
|
// supports all AccessModes, no need to check capabilities here
|
|
return &csi.ValidateVolumeCapabilitiesResponse{Message: ""}, nil
|
|
}
|
|
|
|
func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
|
|
return nil, status.Error(codes.Unimplemented, "")
|
|
}
|
|
|
|
func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
|
|
return nil, status.Error(codes.Unimplemented, "")
|
|
}
|
|
|
|
// ControllerGetCapabilities implements the default GRPC callout.
|
|
// Default supports all capabilities
|
|
func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
|
|
return &csi.ControllerGetCapabilitiesResponse{
|
|
Capabilities: cs.Driver.cscap,
|
|
}, nil
|
|
}
|
|
|
|
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
|
|
return nil, status.Error(codes.Unimplemented, "")
|
|
}
|
|
|
|
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
|
|
return nil, status.Error(codes.Unimplemented, "")
|
|
}
|
|
|
|
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
|
|
return nil, status.Error(codes.Unimplemented, "")
|
|
}
|
|
|
|
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
|
|
return nil, status.Error(codes.Unimplemented, "")
|
|
}
|
|
|
|
func (cs *ControllerServer) validateVolumeCapabilities(caps []*csi.VolumeCapability) error {
|
|
if len(caps) == 0 {
|
|
return fmt.Errorf("volume capabilities must be provided")
|
|
}
|
|
|
|
for _, c := range caps {
|
|
if err := cs.validateVolumeCapability(c); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (cs *ControllerServer) validateVolumeCapability(c *csi.VolumeCapability) error {
|
|
if c == nil {
|
|
return fmt.Errorf("volume capability must be provided")
|
|
}
|
|
|
|
// Validate access mode
|
|
accessMode := c.GetAccessMode()
|
|
if accessMode == nil {
|
|
return fmt.Errorf("volume capability access mode not set")
|
|
}
|
|
if !cs.Driver.cap[accessMode.Mode] {
|
|
return fmt.Errorf("driver does not support access mode: %v", accessMode.Mode.String())
|
|
}
|
|
|
|
// Validate access type
|
|
accessType := c.GetAccessType()
|
|
if accessType == nil {
|
|
return fmt.Errorf("volume capability access type not set")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Mount nfs server at base-dir
|
|
func (cs *ControllerServer) internalMount(ctx context.Context, vol *nfsVolume, volCap *csi.VolumeCapability) error {
|
|
sharePath := filepath.Join(string(filepath.Separator) + vol.baseDir)
|
|
targetPath := cs.getInternalMountPath(vol)
|
|
|
|
if volCap == nil {
|
|
volCap = &csi.VolumeCapability{
|
|
AccessType: &csi.VolumeCapability_Mount{
|
|
Mount: &csi.VolumeCapability_MountVolume{},
|
|
},
|
|
}
|
|
}
|
|
|
|
klog.V(4).Infof("internally mounting %v:%v at %v", vol.server, sharePath, targetPath)
|
|
_, err := cs.Driver.ns.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
|
|
TargetPath: targetPath,
|
|
VolumeContext: map[string]string{
|
|
paramServer: vol.server,
|
|
paramShare: sharePath,
|
|
},
|
|
VolumeCapability: volCap,
|
|
VolumeId: vol.id,
|
|
})
|
|
return err
|
|
}
|
|
|
|
// Unmount nfs server at base-dir
|
|
func (cs *ControllerServer) internalUnmount(ctx context.Context, vol *nfsVolume) error {
|
|
targetPath := cs.getInternalMountPath(vol)
|
|
|
|
// Unmount nfs server at base-dir
|
|
klog.V(4).Infof("internally unmounting %v", targetPath)
|
|
_, err := cs.Driver.ns.NodeUnpublishVolume(ctx, &csi.NodeUnpublishVolumeRequest{
|
|
VolumeId: vol.id,
|
|
TargetPath: cs.getInternalMountPath(vol),
|
|
})
|
|
return err
|
|
}
|
|
|
|
// Convert VolumeCreate parameters to an nfsVolume
|
|
func (cs *ControllerServer) newNFSVolume(name string, size int64, params map[string]string) (*nfsVolume, error) {
|
|
var (
|
|
server string
|
|
baseDir string
|
|
)
|
|
|
|
// validate parameters (case-insensitive)
|
|
for k, v := range params {
|
|
switch strings.ToLower(k) {
|
|
case paramServer:
|
|
server = v
|
|
case paramShare:
|
|
baseDir = v
|
|
default:
|
|
return nil, fmt.Errorf("invalid parameter %q", k)
|
|
}
|
|
}
|
|
|
|
// validate required parameters
|
|
if server == "" {
|
|
return nil, fmt.Errorf("%v is a required parameter", paramServer)
|
|
}
|
|
if baseDir == "" {
|
|
return nil, fmt.Errorf("%v is a required parameter", paramShare)
|
|
}
|
|
|
|
vol := &nfsVolume{
|
|
server: server,
|
|
baseDir: baseDir,
|
|
subDir: name,
|
|
size: size,
|
|
}
|
|
vol.id = cs.getVolumeIDFromNfsVol(vol)
|
|
|
|
return vol, nil
|
|
}
|
|
|
|
// Get working directory for CreateVolume and DeleteVolume
|
|
func (cs *ControllerServer) getInternalMountPath(vol *nfsVolume) string {
|
|
// use default if empty
|
|
if cs.workingMountDir == "" {
|
|
cs.workingMountDir = "/tmp"
|
|
}
|
|
return filepath.Join(cs.workingMountDir, vol.subDir)
|
|
}
|
|
|
|
// Get internal path where the volume is created
|
|
// The reason why the internal path is "workingDir/subDir/subDir" is because:
|
|
// * the semantic is actually "workingDir/volId/subDir" and volId == subDir.
|
|
// * we need a mount directory per volId because you can have multiple
|
|
// CreateVolume calls in parallel and they may use the same underlying share.
|
|
// Instead of refcounting how many CreateVolume calls are using the same
|
|
// share, it's simpler to just do a mount per request.
|
|
func (cs *ControllerServer) getInternalVolumePath(vol *nfsVolume) string {
|
|
return filepath.Join(cs.getInternalMountPath(vol), vol.subDir)
|
|
}
|
|
|
|
// Get user-visible share path for the volume
|
|
func (cs *ControllerServer) getVolumeSharePath(vol *nfsVolume) string {
|
|
return filepath.Join(string(filepath.Separator), vol.baseDir, vol.subDir)
|
|
}
|
|
|
|
// Convert into nfsVolume into a csi.Volume
|
|
func (cs *ControllerServer) nfsVolToCSI(vol *nfsVolume) *csi.Volume {
|
|
return &csi.Volume{
|
|
CapacityBytes: 0, // by setting it to zero, Provisioner will use PVC requested size as PV size
|
|
VolumeId: vol.id,
|
|
VolumeContext: map[string]string{
|
|
paramServer: vol.server,
|
|
paramShare: cs.getVolumeSharePath(vol),
|
|
},
|
|
}
|
|
}
|
|
|
|
// Given a nfsVolume, return a CSI volume id
|
|
func (cs *ControllerServer) getVolumeIDFromNfsVol(vol *nfsVolume) string {
|
|
idElements := make([]string, totalIDElements)
|
|
idElements[idServer] = strings.Trim(vol.server, "/")
|
|
idElements[idBaseDir] = strings.Trim(vol.baseDir, "/")
|
|
idElements[idSubDir] = strings.Trim(vol.subDir, "/")
|
|
return strings.Join(idElements, "/")
|
|
}
|
|
|
|
// Given a CSI volume id, return a nfsVolume
|
|
func (cs *ControllerServer) getNfsVolFromID(id string) (*nfsVolume, error) {
|
|
volRegex := regexp.MustCompile("^([^/]+)/(.*)/([^/]+)$")
|
|
tokens := volRegex.FindStringSubmatch(id)
|
|
if tokens == nil {
|
|
return nil, fmt.Errorf("Could not split %q into server, baseDir and subDir", id)
|
|
}
|
|
|
|
return &nfsVolume{
|
|
id: id,
|
|
server: tokens[1],
|
|
baseDir: tokens[2],
|
|
subDir: tokens[3],
|
|
}, nil
|
|
}
|