10000 Release/v1.88.0 by lx1036 · Pull Request #258 · lx1036/code · GitHub
[go: up one dir, main page]

Skip to content

Release/v1.88.0 #258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
(ipvs)ipvs for L4 loadbalancer
  • Loading branch information
lx1036 committed May 29, 2022
commit f2777c737a4ac7f0ee5420c2d935d2cad85579ef
< 8000 th scope="col">Diff line number
Original file line number Diff line change
Expand Up @@ -23,16 +23,15 @@ func (controller *NetworkServiceController) onEndpointUpdate(endpoint *corev1.En
if err != nil {
return
}

svc, exist, err := controller.svcLister.GetByKey(key)
if err != nil {
klog.Errorf(fmt.Sprintf("failed to get svc %s err: %v", key, err))
return
}
if !exist { // ignore endpoint has no service
if !exist { // ignore endpoint has no service, if service is deleted, syncService handle it.
return
}
if IsHeadlessService(svc.(*corev1.Service)) {
if IsHeadlessService(svc.(*corev1.Service)) || IsExternalNameService(svc.(*corev1.Service)) {
return
}

Expand Down Expand Up @@ -103,6 +102,16 @@ func (controller *NetworkServiceController) buildEndpointInfo() endpointInfoMap
return endpointsMap
}

func hasLocalEndpoint(endpoints []endpointInfo) bool {
for _, endpoint := range endpoints {
if endpoint.isLocal {
return true
}
}

return false
}

func isEndpointsForLeaderElection(ep *corev1.Endpoints) bool {
_, isLeaderElection := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]
return isLeaderElection
Expand Down
51 changes: 35 additions & 16 deletions go/k8s/network/loadbalancer/kube-proxy/ipvs/pkg/controller/ipvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"sync"
)

// INFO: @see https://github.com/kubernetes/kubernetes/blob/master/pkg/util/ipvs/ipvs_linux.go
// https://github.com/cloudnativelabs/kube-router/blob/master/pkg/controllers/proxy/network_services_controller.go#L131-L140

const (
KubeDummyIf = "kube-dummy-if"
)
Expand Down Expand Up @@ -92,22 +95,20 @@ func (ln *linuxNetworking) EnsureAddressBind(link netlink.Link, ip string, addRo
return nil
}

func (ln *linuxNetworking) AddOrUpdateVirtualServer(svcInfo serviceInfo) error {
func (ln *linuxNetworking) AddOrUpdateVirtualServer(ipvsSvc *ipvs.Service) error {
ln.Lock()
defer ln.Unlock()

ipvsSvc := toIPVSService(svcInfo)
oldIpvsSvc, _ := ln.ipvsHandle.GetService(ipvsSvc)

if oldIpvsSvc == nil || !equalIPVSService(oldIpvsSvc, ipvsSvc) {
if oldIpvsSvc == nil {
if err := ln.AddVirtualServer(svcInfo); err != nil {
klog.Errorf(fmt.Sprintf("Add new ipvs service %s/%s for virtual server err: %v", svcInfo.namespace, svcInfo.name, err))
if err := ln.AddVirtualServer(ipvsSvc); err != nil {
klog.Errorf(fmt.Sprintf("Add new ipvs service for virtual server err: %v", err))
return err
}
} else {
if err := ln.UpdateVirtualServer(svcInfo); err != nil {
klog.Errorf(fmt.Sprintf("Edit existed ipvs service %s/%s for virtual server err: %v", svcInfo.namespace, svcInfo.name, err))
if err := ln.UpdateVirtualServer(ipvsSvc); err != nil {
klog.Errorf(fmt.Sprintf("Edit existed ipvs service for virtual server err: %v", err))
return err
}
}
Expand All @@ -116,11 +117,10 @@ func (ln *linuxNetworking) AddOrUpdateVirtualServer(svcInfo serviceInfo) error {
return nil
}

func (ln *linuxNetworking) GetVirtualServer(svcInfo serviceInfo) (*ipvs.Service, error) {
func (ln *linuxNetworking) GetVirtualServer(ipvsSvc *ipvs.Service) (*ipvs.Service, error) {
ln.Lock()
defer ln.Unlock()

ipvsSvc := toIPVSService(svcInfo)
oldIpvsSvc, err := ln.ipvsHandle.GetService(ipvsSvc)
if err != nil {
return nil, err
Expand All @@ -129,20 +129,18 @@ func (ln *linuxNetworking) GetVirtualServer(svcInfo serviceInfo) (*ipvs.Service,
}

// AddVirtualServer `ipvsadm --add-service xxx`
func (ln *linuxNetworking) AddVirtualServer(svcInfo serviceInfo) error {
func (ln *linuxNetworking) AddVirtualServer(ipvsSvc *ipvs.Service) error {
ln.Lock()
defer ln.Unlock()

ipvsSvc := toIPVSService(svcInfo)
return ln.ipvsHandle.NewService(ipvsSvc)
}

// UpdateVirtualServer `ipvsadm --edit-service xxx`
func (ln *linuxNetworking) UpdateVirtualServer(svcInfo serviceInfo) error {
func (ln *linuxNetworking) UpdateVirtualServer(ipvsSvc *ipvs.Service) error {
ln.Lock()
defer ln.Unlock()

ipvsSvc := toIPVSService(svcInfo)
return ln.ipvsHandle.UpdateService(ipvsSvc)
}

Expand All @@ -167,11 +165,32 @@ func (ln *linuxNetworking) DelRealServer(ipvsSvc *ipvs.Service, dst *ipvs.Destin
return ln.ipvsHandle.DelDestination(ipvsSvc, dst)
}

func toIPVSService(svcInfo serviceInfo) *ipvs.Service {
func clusterIPToIPVSService(svcInfo serviceInfo) *ipvs.Service {
ipvsSvc := &ipvs.Service{
Address: svcInfo.address, // clusterIP
Protocol: stringToProtocol(svcInfo.protocol),
Port: uint16(svcInfo.port), // clusterIP
SchedName: svcInfo.scheduler,
Flags: svcInfo.flags,
Timeout: svcInfo.sessionAffinityTimeoutSeconds,
}

if ip4 := svcInfo.address.To4(); ip4 != nil {
ipvsSvc.AddressFamily = unix.AF_INET
ipvsSvc.Netmask = 0xffffffff
} else {
ipvsSvc.AddressFamily = unix.AF_INET6
ipvsSvc.Netmask = 128
}

return ipvsSvc
}

func nodePortToIPVSService(svcInfo serviceInfo) *ipvs.Service {
ipvsSvc := &ipvs.Service{
Address: svcInfo.address,
Address: NodeIP, // nodePort
Protocol: stringToProtocol(svcInfo.protocol),
Port: uint16(svcInfo.port),
Port: uint16(svcInfo.nodePort), // nodePort
SchedName: svcInfo.scheduler,
Flags: svcInfo.flags,
Timeout: svcInfo.sessionAffinityTimeoutSeconds,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,17 @@ type NetworkServiceController struct {

ln *linuxNetworking

nodeIP net.IP
nodeName string
nodeIP net.IP
nodeName string
readyForUpdates bool
syncPeriod time.Duration // The delay between ipvs config synchronizations (e.g. '5s', '1m', '2h22m')

svcLister cache.Indexer
epLister cache.Indexer
serviceMap serviceInfoMap
endpointMap endpointInfoMap

syncChan chan struct{}
stopCh chan struct{}
}

func NewNetworkPolicyController(
Expand All @@ -52,7 +53,8 @@ func NewNetworkPolicyController(
}

c := &NetworkServiceController{
ln: ln,
ln: ln,
syncPeriod: time.Minute,

svcLister: svcInformer.GetIndexer(),
epLister: epInformer.GetIndexer(),
Expand Down Expand Up @@ -171,13 +173,26 @@ func NewNetworkPolicyController(
return c, nil
}

func (controller *NetworkServiceController) Run() {
func (controller *NetworkServiceController) Run(stopCh <-chan struct{}) {
t := time.NewTicker(controller.syncPeriod)
defer t.Stop()

select {
case <-stopCh:
klog.Info("Shutting down network services controller")
return
default:
err := controller.doSync()
if err != nil {
klog.Fatalf(fmt.Sprintf("Failed to perform initial full sync %v", err))
}
controller.readyForUpdates = true
}

for {
select {
case <-controller.stopCh:
case <-stopCh:
controller.readyForUpdates = false
return

case <-controller.syncChan:
Expand All @@ -191,7 +206,7 @@ func (controller *NetworkServiceController) Run() {
controller.Unlock()

case <-t.C:
controller.syncAll()
controller.doSync()
}
}
}
Expand All @@ -207,9 +222,13 @@ func (controller *NetworkServiceController) sync() {
// INFO: 因为 delete 事件也是 onServiceUpdate() 使用的 svcLister.List(),没有考虑 delete 事件,所以需要整体全部捞一遍来 cleanup
func (controller *NetworkServiceController) syncService(serviceMap serviceInfoMap, endpointMap endpointInf A92E oMap) {
err := controller.syncClusterIPService(serviceMap, endpointMap)
controller.syncNodePortService(serviceMap, endpointMap)

//controller.cleanup
if err != nil {
return
}
err = controller.syncNodePortService(serviceMap, endpointMap)
if err != nil {
return
}

}

Expand All @@ -225,13 +244,17 @@ func (controller *NetworkServiceController) syncClusterIPService(serviceMap serv

for svcID, svcInfo := range serviceMap {
endpoints := endpointMap[svcID]
if svcInfo.isLocal && !hasLocalEndpoint(endpoints) { // ClusterIP service has no local eps, skip create ipvs vs/rs at current node
continue
}

// (1)add ipvs service virtual server
if err := controller.ln.AddOrUpdateVirtualServer(*svcInfo); err != nil {
ipvsSvc := clusterIPToIPVSService(*svcInfo)
if err := controller.ln.AddOrUpdateVirtualServer(ipvsSvc); err != nil {
klog.Errorf(fmt.Sprintf("[syncClusterIPService]AddOrUpdateVirtualServer for %s/%s err:%v", svcInfo.namespace, svcInfo.name, err))
continue
}
ipvsSvc, err := controller.ln.GetVirtualServer(*svcInfo)
ipvsSvc, err = controller.ln.GetVirtualServer(ipvsSvc)
if err != nil {
klog.Errorf(fmt.Sprintf("failed to get ipvs service %s/%s err: %v", svcInfo.namespace, svcInfo.name, err))
continue
Expand All @@ -243,51 +266,97 @@ func (controller *NetworkServiceController) syncClusterIPService(serviceMap serv
continue
}

// (3) add/delete ipvs real server
destinations, err := controller.ln.ListRealServer(ipvsSvc)
controller.syncEndpoints(svcInfo, ipvsSvc, endpoints)
}

return nil
}

func (controller *NetworkServiceController) syncNodePortService(serviceMap serviceInfoMap, endpointMap endpointInfoMap) error {
dummyVipInterface, err := controller.ln.EnsureDummyDevice()
if err != nil {
return errors.New("Failed creating dummy interface: " + err.Error())
}

for svcID, svcInfo := range serviceMap {
endpoints := endpointMap[svcID]
if svcInfo.nodePort == 0 { // service is not NodePort
continue
}

if svcInfo.isLocal && !hasLocalEndpoint(endpoints) { // NodePort service has no local eps, skip create ipvs vs/rs at current node
continue
}

// (1)add ipvs service virtual server
ipvsSvc := nodePortToIPVSService(*svcInfo)
if err := controller.ln.AddOrUpdateVirtualServer(ipvsSvc); err != nil {
klog.Errorf(fmt.Sprintf("[syncClusterIPService]AddOrUpdateVirtualServer for %s/%s err:%v", svcInfo.namespace, svcInfo.name, err))
continue
}
ipvsSvc, err = controller.ln.GetVirtualServer(ipvsSvc)
if err != nil {
klog.Errorf(fmt.Sprintf("failed to get ipvs service %s/%s err: %v", svcInfo.namespace, svcInfo.name, err))
continue
}

// (2) assign clusterIP to dummy interface, add route in local table
err = controller.ln.EnsureAddressBind(dummyVipInterface, svcInfo.address.String(), true)
if err != nil {
continue
}
oldDst := make(map[string]*ipvs.Destination)
for _, destination := range destinations {
key := fmt.Sprintf("%s:%d", destination.Address.String(), destination.Port)
oldDst[key] = destination

controller.syncEndpoints(svcInfo, ipvsSvc, endpoints)
}

return nil
}

func (controller *NetworkServiceController) syncEndpoints(svcInfo *serviceInfo, ipvsSvc *ipvs.Service, endpoints []endpointInfo) {
// (3) add/delete ipvs real server
destinations, err := controller.ln.ListRealServer(ipvsSvc)
if err != nil {
klog.Errorf(fmt.Sprintf("failed to list real server for svc %s/%s err: %v", svcInfo.namespace, svcInfo.name, err))
return
}
oldDst := make(map[string]*ipvs.Destination)
for _, destination := range destinations {
key := fmt.Sprintf("%s:%d", destination.Address.String(), destination.Port)
oldDst[key] = destination
}
newDst := make(map[string]endpointInfo)
for _, endpoint := range endpoints {
if svcInfo.isLocal && !endpoint.isLocal { // skip if endpoint is not externalTrafficPolicyLocal
continue
}
newDst := make(map[string]endpointInfo)
for _, endpoint := range endpoints {
if svcInfo.isLocal && !endpoint.isLocal {
continue
key := fmt.Sprintf("%s:%d", endpoint.address.String(), endpoint.port)
newDst[key] = endpoint
}
for key, info := range newDst { // add new ipvs real server
_, ok := oldDst[key]
if !ok {
dst := ipvs.Destination{
Address: info.address,
AddressFamily: syscall.AF_INET,
Port: uint16(info.port),
Weight: 1,
}
key := fmt.Sprintf("%s:%d", endpoint.address.String(), endpoint.port)
newDst[key] = endpoint
}
for key, info := range newDst { // add new ipvs real server
_, ok := oldDst[key]
if !ok {
dst := ipvs.Destination{
Address: info.address,
AddressFamily: syscall.AF_INET,
Port: uint16(info.port),
Weight: 1,
}
if err = controller.ln.AddRealServer(ipvsSvc, &dst); err != nil {
klog.Errorf(fmt.Sprintf("failed to add new ipvs real server err: %v", err))
continue
}
if err = controller.ln.AddRealServer(ipvsSvc, &dst); err != nil {
klog.Errorf(fmt.Sprintf("failed to add new ipvs real server err: %v", err))
continue
}
}
for key, destination := range oldDst { // delete old ipvs real server
_, ok := newDst[key]
if !ok {
if err = controller.ln.DelRealServer(ipvsSvc, destination); err != nil {
klog.Errorf(fmt.Sprintf("failed to add new ipvs real server err: %v", err))
continue
}
}
for key, destination := range oldDst { // delete old ipvs real server
_, ok := newDst[key]
if !ok {
if err = controller.ln.DelRealServer(ipvsSvc, destination); err != nil {
klog.Errorf(fmt.Sprintf("failed to add new ipvs real server err: %v", err))
continue
}
}
}

return nil
}

// GetNodeIP returns the most valid external facing IP address for a node.
Expand Down
Loading
0