remove csi-common dependencies
This commit is contained in:
parent
e88a4ea5d4
commit
cc481ceade
@ -65,6 +65,6 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func handle() {
|
func handle() {
|
||||||
d := nfs.NewDriver(nodeID, endpoint)
|
d := nfs.NewNFSdriver(nodeID, endpoint)
|
||||||
d.Run()
|
d.Run()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,22 +2,66 @@ package nfs
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
"github.com/golang/glog"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ControllerServer struct {
|
type ControllerServer struct {
|
||||||
*csicommon.DefaultControllerServer
|
Driver *nfsDriver
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
|
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
||||||
return nil, status.Error(codes.Unimplemented, "")
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func getControllerServer(csiDriver *csicommon.CSIDriver) ControllerServer {
|
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
|
||||||
return ControllerServer{
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
csicommon.NewDefaultControllerServer(csiDriver),
|
}
|
||||||
}
|
|
||||||
|
func (cs *ControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *ControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *ControllerServer) ListVolumes(ctx context.Context, req *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *ControllerServer) GetCapacity(ctx context.Context, req *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ControllerGetCapabilities implements the default GRPC callout.
|
||||||
|
// Default supports all capabilities
|
||||||
|
func (cs *ControllerServer) ControllerGetCapabilities(ctx context.Context, req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
|
||||||
|
glog.V(5).Infof("Using default ControllerGetCapabilities")
|
||||||
|
|
||||||
|
return &csi.ControllerGetCapabilitiesResponse{
|
||||||
|
Capabilities: cs.Driver.cscap,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *ControllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,78 +0,0 @@
|
|||||||
/*
|
|
||||||
Copyright 2017 The Kubernetes Authors.
|
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
you may not use this file except in compliance with the License.
|
|
||||||
You may obtain a copy of the License at
|
|
||||||
|
|
||||||
http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
|
|
||||||
Unless required by applicable law or agreed to in writing, software
|
|
||||||
distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
See the License for the specific language governing permissions and
|
|
||||||
limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package nfs
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
|
||||||
"github.com/golang/glog"
|
|
||||||
|
|
||||||
csicommon "github.com/kubernetes-csi/drivers/pkg/csi-common"
|
|
||||||
)
|
|
||||||
|
|
||||||
type driver struct {
|
|
||||||
csiDriver *csicommon.CSIDriver
|
|
||||||
endpoint string
|
|
||||||
|
|
||||||
//ids *identityServer
|
|
||||||
ns *nodeServer
|
|
||||||
cap []*csi.VolumeCapability_AccessMode
|
|
||||||
cscap []*csi.ControllerServiceCapability
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
driverName = "csi-nfsplugin"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
version = "1.0.0-rc2"
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewDriver(nodeID, endpoint string) *driver {
|
|
||||||
glog.Infof("Driver: %v version: %v", driverName, version)
|
|
||||||
|
|
||||||
d := &driver{}
|
|
||||||
|
|
||||||
d.endpoint = endpoint
|
|
||||||
|
|
||||||
csiDriver := csicommon.NewCSIDriver(driverName, version, nodeID)
|
|
||||||
csiDriver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER})
|
|
||||||
// NFS plugin does not support ControllerServiceCapability now.
|
|
||||||
// If support is added, it should set to appropriate
|
|
||||||
// ControllerServiceCapability RPC types.
|
|
||||||
csiDriver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_UNKNOWN})
|
|
||||||
|
|
||||||
d.csiDriver = csiDriver
|
|
||||||
|
|
||||||
return d
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewNodeServer(d *driver) *nodeServer {
|
|
||||||
return &nodeServer{
|
|
||||||
DefaultNodeServer: csicommon.NewDefaultNodeServer(d.csiDriver),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *driver) Run() {
|
|
||||||
s := csicommon.NewNonBlockingGRPCServer()
|
|
||||||
s.Start(d.endpoint,
|
|
||||||
csicommon.NewDefaultIdentityServer(d.csiDriver),
|
|
||||||
// NFS plugin has not implemented ControllerServer
|
|
||||||
// using default controllerserver.
|
|
||||||
getControllerServer(d.csiDriver),
|
|
||||||
NewNodeServer(d))
|
|
||||||
s.Wait()
|
|
||||||
}
|
|
||||||
49
pkg/nfs/indentityserver.go
Normal file
49
pkg/nfs/indentityserver.go
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
package nfs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
type IdentityServer struct {
|
||||||
|
Driver *nfsDriver
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ids *IdentityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
|
||||||
|
glog.V(5).Infof("Using default GetPluginInfo")
|
||||||
|
|
||||||
|
if ids.Driver.name == "" {
|
||||||
|
return nil, status.Error(codes.Unavailable, "Driver name not configured")
|
||||||
|
}
|
||||||
|
|
||||||
|
if ids.Driver.version == "" {
|
||||||
|
return nil, status.Error(codes.Unavailable, "Driver is missing version")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &csi.GetPluginInfoResponse{
|
||||||
|
Name: ids.Driver.name,
|
||||||
|
VendorVersion: ids.Driver.version,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ids *IdentityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
|
||||||
|
return &csi.ProbeResponse{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ids *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
|
||||||
|
glog.V(5).Infof("Using default capabilities")
|
||||||
|
return &csi.GetPluginCapabilitiesResponse{
|
||||||
|
Capabilities: []*csi.PluginCapability{
|
||||||
|
{
|
||||||
|
Type: &csi.PluginCapability_Service_{
|
||||||
|
Service: &csi.PluginCapability_Service{
|
||||||
|
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
102
pkg/nfs/nfs.go
Normal file
102
pkg/nfs/nfs.go
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package nfs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
|
"github.com/golang/glog"
|
||||||
|
)
|
||||||
|
|
||||||
|
type nfsDriver struct {
|
||||||
|
name string
|
||||||
|
nodeID string
|
||||||
|
version string
|
||||||
|
|
||||||
|
endpoint string
|
||||||
|
|
||||||
|
//ids *identityServer
|
||||||
|
ns *nodeServer
|
||||||
|
cap []*csi.VolumeCapability_AccessMode
|
||||||
|
cscap []*csi.ControllerServiceCapability
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
driverName = "csi-nfsplugin"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
version = "1.0.0-rc2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewNFSdriver(nodeID, endpoint string) *nfsDriver {
|
||||||
|
glog.Infof("Driver: %v version: %v", driverName, version)
|
||||||
|
|
||||||
|
n := &nfsDriver{
|
||||||
|
name: driverName,
|
||||||
|
version: version,
|
||||||
|
nodeID: nodeID,
|
||||||
|
endpoint: endpoint,
|
||||||
|
}
|
||||||
|
|
||||||
|
n.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER})
|
||||||
|
// NFS plugin does not support ControllerServiceCapability now.
|
||||||
|
// If support is added, it should set to appropriate
|
||||||
|
// ControllerServiceCapability RPC types.
|
||||||
|
n.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{csi.ControllerServiceCapability_RPC_UNKNOWN})
|
||||||
|
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewNodeServer(n *nfsDriver) *nodeServer {
|
||||||
|
return &nodeServer{
|
||||||
|
Driver: n,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *nfsDriver) Run() {
|
||||||
|
s := NewNonBlockingGRPCServer()
|
||||||
|
s.Start(n.endpoint,
|
||||||
|
NewDefaultIdentityServer(n),
|
||||||
|
// NFS plugin has not implemented ControllerServer
|
||||||
|
// using default controllerserver.
|
||||||
|
NewControllerServer(n),
|
||||||
|
NewNodeServer(n))
|
||||||
|
s.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *nfsDriver) AddVolumeCapabilityAccessModes(vc []csi.VolumeCapability_AccessMode_Mode) []*csi.VolumeCapability_AccessMode {
|
||||||
|
var vca []*csi.VolumeCapability_AccessMode
|
||||||
|
for _, c := range vc {
|
||||||
|
glog.Infof("Enabling volume access mode: %v", c.String())
|
||||||
|
vca = append(vca, &csi.VolumeCapability_AccessMode{Mode: c})
|
||||||
|
}
|
||||||
|
n.cap = vca
|
||||||
|
return vca
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *nfsDriver) AddControllerServiceCapabilities(cl []csi.ControllerServiceCapability_RPC_Type) {
|
||||||
|
var csc []*csi.ControllerServiceCapability
|
||||||
|
|
||||||
|
for _, c := range cl {
|
||||||
|
glog.Infof("Enabling controller service capability: %v", c.String())
|
||||||
|
csc = append(csc, NewControllerServiceCapability(c))
|
||||||
|
}
|
||||||
|
|
||||||
|
n.cscap = csc
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
@ -18,11 +18,11 @@ package nfs
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/golang/glog"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
@ -30,7 +30,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type nodeServer struct {
|
type nodeServer struct {
|
||||||
*csicommon.DefaultNodeServer
|
Driver *nfsDriver
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
|
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
|
||||||
@ -98,6 +98,34 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
|||||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ns *nodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
|
||||||
|
glog.V(5).Infof("Using default NodeGetInfo")
|
||||||
|
|
||||||
|
return &csi.NodeGetInfoResponse{
|
||||||
|
NodeId: ns.Driver.nodeID,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
|
||||||
|
glog.V(5).Infof("Using default NodeGetCapabilities")
|
||||||
|
|
||||||
|
return &csi.NodeGetCapabilitiesResponse{
|
||||||
|
Capabilities: []*csi.NodeServiceCapability{
|
||||||
|
{
|
||||||
|
Type: &csi.NodeServiceCapability_Rpc{
|
||||||
|
Rpc: &csi.NodeServiceCapability_RPC{
|
||||||
|
Type: csi.NodeServiceCapability_RPC_UNKNOWN,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ns *nodeServer) NodeGetVolumeStats(ctx context.Context, in *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
|
||||||
|
return nil, status.Error(codes.Unimplemented, "")
|
||||||
|
}
|
||||||
|
|
||||||
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
|
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
|
||||||
return &csi.NodeUnstageVolumeResponse{}, nil
|
return &csi.NodeUnstageVolumeResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
96
pkg/nfs/server.go
Normal file
96
pkg/nfs/server.go
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
package nfs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Defines Non blocking GRPC server interfaces
|
||||||
|
type NonBlockingGRPCServer interface {
|
||||||
|
// Start services at the endpoint
|
||||||
|
Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer)
|
||||||
|
// Waits for the service to stop
|
||||||
|
Wait()
|
||||||
|
// Stops the service gracefully
|
||||||
|
Stop()
|
||||||
|
// Stops the service forcefully
|
||||||
|
ForceStop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewNonBlockingGRPCServer() NonBlockingGRPCServer {
|
||||||
|
return &nonBlockingGRPCServer{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NonBlocking server
|
||||||
|
type nonBlockingGRPCServer struct {
|
||||||
|
wg sync.WaitGroup
|
||||||
|
server *grpc.Server
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
|
||||||
|
|
||||||
|
s.wg.Add(1)
|
||||||
|
|
||||||
|
go s.serve(endpoint, ids, cs, ns)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *nonBlockingGRPCServer) Wait() {
|
||||||
|
s.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *nonBlockingGRPCServer) Stop() {
|
||||||
|
s.server.GracefulStop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *nonBlockingGRPCServer) ForceStop() {
|
||||||
|
s.server.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
|
||||||
|
|
||||||
|
proto, addr, err := ParseEndpoint(endpoint)
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if proto == "unix" {
|
||||||
|
addr = "/" + addr
|
||||||
|
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
|
||||||
|
glog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
listener, err := net.Listen(proto, addr)
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatalf("Failed to listen: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := []grpc.ServerOption{
|
||||||
|
grpc.UnaryInterceptor(logGRPC),
|
||||||
|
}
|
||||||
|
server := grpc.NewServer(opts...)
|
||||||
|
s.server = server
|
||||||
|
|
||||||
|
if ids != nil {
|
||||||
|
csi.RegisterIdentityServer(server, ids)
|
||||||
|
}
|
||||||
|
if cs != nil {
|
||||||
|
csi.RegisterControllerServer(server, cs)
|
||||||
|
}
|
||||||
|
if ns != nil {
|
||||||
|
csi.RegisterNodeServer(server, ns)
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.Infof("Listening for connections on address: %#v", listener.Addr())
|
||||||
|
|
||||||
|
server.Serve(listener)
|
||||||
|
|
||||||
|
}
|
||||||
55
pkg/nfs/utils.go
Normal file
55
pkg/nfs/utils.go
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
package nfs
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewDefaultIdentityServer(d *nfsDriver) *IdentityServer {
|
||||||
|
return &IdentityServer{
|
||||||
|
Driver: d,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewControllerServer(d *nfsDriver) *ControllerServer {
|
||||||
|
return &ControllerServer{
|
||||||
|
Driver: d,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewControllerServiceCapability(cap csi.ControllerServiceCapability_RPC_Type) *csi.ControllerServiceCapability {
|
||||||
|
return &csi.ControllerServiceCapability{
|
||||||
|
Type: &csi.ControllerServiceCapability_Rpc{
|
||||||
|
Rpc: &csi.ControllerServiceCapability_RPC{
|
||||||
|
Type: cap,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseEndpoint(ep string) (string, string, error) {
|
||||||
|
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
|
||||||
|
s := strings.SplitN(ep, "://", 2)
|
||||||
|
if s[1] != "" {
|
||||||
|
return s[0], s[1], nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
|
||||||
|
}
|
||||||
|
|
||||||
|
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||||
|
glog.V(3).Infof("GRPC call: %s", info.FullMethod)
|
||||||
|
glog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
|
||||||
|
resp, err := handler(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("GRPC error: %v", err)
|
||||||
|
} else {
|
||||||
|
glog.V(5).Infof("GRPC response: %s", protosanitizer.StripSecrets(resp))
|
||||||
|
}
|
||||||
|
return resp, err
|
||||||
|
}
|
||||||
Loading…
x
Reference in New Issue
Block a user