利用 CRD 实现一个 mini-k8s-proxy

前言

进入正文前,需要熟悉以下几个概念定义:

  • Custom Resources: 资源(Resource) 是 Kubernetes API 中的一个端点, 其中存储的是某个类别的 API 对象(Objects)的一个集合。例如内置的 pods 资源包含一组 Pod 对象。定制资源(Custom Resources)是对 Kubernetes API 的扩展,可以动态的在集群内安装删除,对用户而言,如同使用 pods 这种内置资源方式一样,可以使用 kubectl 来创建和访问定制资源里面的定制对象(Custom Objects)。
  • CustomResourceDefinition(CRD): CRD 定制资源定义是 Kubernetes 提供的向集群中添加定制资源(Custom Resources)的一种方式,允许用户定义定制资源。
  • Custom Controllers: 定制资源只能存取结构化的数据,而定制控制器(Custom Controllers)可以通过观测分析将结构化的数据解释为用户所期望状态的记录,并持续地维护该状态。它可以用于任何类别的资源,若定制资源与定制控制器相结合,定制资源就能够提供真正的声明式 API(Declarative API)。例如,Traefik 的 IngressRoute,apache apisix 的 apisix-ingress-controller ,以及本文接下来的示例等,都是通过编写 CRD 创建定制资源 + 使用 k8s.io/code-generator 实现定制控制器完成的。
  • Operator: Operator 是 Kubernetes 的扩展软件,可以理解为一种特殊的 Controller ,旨在捕获(正在管理一个或一组服务的)运维人员的关键目标。原理和 Controller 一样,都是通过编写 CRD 创建定制资源然后监听 CRD 相关变化并作出响应。区别就在于 Operator 可能会结合原生的 Controller(Deployment/Replicas),或者 Kubernetes 的网络,存储等来控制应用程序的状态。通俗点讲就是,Operator 是一个功能更强大,封装了对于特定应用程序的运维经验的 Controller 。例如,etcd operator,prometheus operator 等。

如果觉得上面的定义太枯燥了,那就由我来简单的总结一遍吧:

  • Kubernetes 里面呢,有很多内置的资源,资源中包括对象,例如 pods 资源包含着一组 Pod 对象
  • Kubernetes 为了方便用户,还允许用户创建自定义资源,用户创建出来的资源就取了个高大上的名字叫定制资源,同理,定制资源里有定制对象,例如 Traefik 里面的 ingressroutes 定制资源中包含着一组 IngressRoute 对象
  • 用户想要创建定制资源,可以通过编写 CustomResourceDefinition(CRD,定制资源定义)来实现
  • 用户创建完定制资源后,还可以编写代码来监听资源的创建、删除、更新动作,这就是一个 Controller 控制器了
  • 用户编写的控制器如果是专注于某个特定的应用程序,并赋予了特定的领域知识,就形成了一个 Operator

目标

实现一个可以通过配置 host 拦截到匹配的请求域名,将流量代理转发到具体的 service 中(通过配置 serviceName,namespace,port,scheme)的极简网络代理工具。其中,配置通过 CRD 创建,代理程序可以通过控制器监听配置变化,动态更新,无需重启。(PS:其实就是简单模拟了 Traefik IngressRoute 的实现)

创建 CRD

将下面的 CustomResourceDefinition 保存为 crd.yaml 文件

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  # 名字必需与下面的 spec 字段匹配,并且格式为 '<名称的复数形式>.<组名>'
  name: proxyroutes.miniproxy.togettoyou.com
spec:
  # 组名
  group: miniproxy.togettoyou.com
  names:
    # kind 通常是单数形式的驼峰编码(CamelCased)形式。你的资源清单会使用这一形式。
    kind: ProxyRoute
    # shortNames 允许你在命令行使用较短的字符串来匹配资源
    shortNames:
      - pr
    # 名称的复数形式,用于 URL:/apis/<组>/<版本>/<名称的复数形式>
    plural: proxyroutes
    # 名称的单数形式,作为命令行使用时和显示时的别名
    singular: proxyroute
  # 可以是 Namespaced 或 Cluster
  scope: Namespaced
  # 列举此 CustomResourceDefinition 所支持的版本
  versions:
    - name: v1alpha1
      # 每个版本都可以通过 served 标志来独立启用或禁止
      served: true
      # 其中一个且只有一个版本必需被标记为存储版本
      storage: true
      # schema 是必需字段
      schema:
        # openAPIV3Schema 是用来检查定制对象的模式定义
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                # 定义我们需要的几个配置项
                host:
                  type: string
                serviceName:
                  type: string
                namespace:
                  type: string
                port:
                  type: integer
                scheme:
                  type: boolean

之后创建它:

kubectl apply -f crd.yaml

现在,就可以创建定制对象啦,将下面的 YAML 保存为 example.yaml

apiVersion: miniproxy.togettoyou.com/v1alpha1
kind: ProxyRoute
metadata:
  name: example-proxyroute
spec:
  # 监听域名
  host: whoami.togettoyou.com
  # 假设你有一个 whomai 的 service,位于 default 命名空间,容器内部端口为 80 ,http 协议
  serviceName: whoami
  namespace: default
  port: 80
  scheme: false

并执行创建命令:

kubectl apply -f example.yaml

结合上文的定义介绍,复习一遍,在这里 proxyroutes 就是我们通过 CRD 创建的定制资源,其中包含着一组 ProxyRoute 对象。

现在可以使用 kubectl 来查看我们刚才创建的定制对象:

$ kubectl get proxyroute
NAME                 AGE
example-proxyroute   49s

$ kubectl get pr
NAME                 AGE
example-proxyroute   50s

实现控制器

创建项目目录如下:

├─pkg
│  └─apis
│      └─miniproxy
│          └─v1alpha1
│              └─doc.go
│              └─register.go
│              └─types.go
│          └─register.go
├─script
│  └─boilerplate.go.txt
│  └─code-gen.sh
│  └─codegen.Dockerfile

doc.go 代码:

// +k8s:deepcopy-gen=package
// +groupName=miniproxy.togettoyou.com

// Package v1alpha1 is the v1alpha1 version of the API.
package v1alpha1

register.go 代码:

package v1alpha1

import (
 "mini-k8s-proxy/pkg/apis/miniproxy"

 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 "k8s.io/apimachinery/pkg/runtime"
 "k8s.io/apimachinery/pkg/runtime/schema"
)

// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: miniproxy.GroupName, Version: "v1alpha1"}

var (
 // SchemeBuilder initializes a scheme builder
 SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
 // AddToScheme is a global function that registers this API group & version to a scheme
 AddToScheme = SchemeBuilder.AddToScheme
)

// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
 return SchemeGroupVersion.WithKind(kind).GroupKind()
}

// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
 return SchemeGroupVersion.WithResource(resource).GroupResource()
}

// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
 scheme.AddKnownTypes(SchemeGroupVersion,
  // 主要在这里导入我们的定制资源对象
  &ProxyRoute{},
  &ProxyRouteList{},
 )
 metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
 return nil
}

types.go 代码:

package v1alpha1

import (
 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// ProxyRoute is a specification for a ProxyRoute resource
type ProxyRoute struct {
 metav1.TypeMeta   `json:",inline"`
 metav1.ObjectMeta `json:"metadata,omitempty"`

 Spec ProxyRouteSpec `json:"spec"`
}

// ProxyRouteSpec is the spec for a ProxyRoute resource
type ProxyRouteSpec struct {
 Host        string `json:"host"`
 ServiceName string `json:"serviceName"`
 Namespace   string `json:"namespace,omitempty"`
 Port        int32  `json:"port,omitempty"`
 Scheme      bool   `json:"scheme,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// ProxyRouteList is a list of ProxyRoute resources
type ProxyRouteList struct {
 metav1.TypeMeta `json:",inline"`
 metav1.ListMeta `json:"metadata"`

 Items []ProxyRoute `json:"items"`
}

register.go 代码:

package miniproxy

// GroupName is the group name used in this package
const (
 GroupName = "miniproxy.togettoyou.com"
)

代码编写完成后,就可以使用 k8s.io/code-generator 来生成控制器相关代码了,脚本定义在 script 文件夹下,其中 boilerplate.go.txt 为生成的代码头部协议注释,codegen.Dockerfile 内容为:

FROM golang:1.16

ARG KUBE_VERSION

ENV GO111MODULE=on
ENV GOPROXY https://goproxy.cn,direct

RUN go get k8s.io/code-generator@$KUBE_VERSION; exit 0
RUN go get k8s.io/apimachinery@$KUBE_VERSION; exit 0

RUN mkdir -p $GOPATH/src/k8s.io/{code-generator,apimachinery}
RUN cp -R $GOPATH/pkg/mod/k8s.io/code-generator@$KUBE_VERSION $GOPATH/src/k8s.io/code-generator
RUN cp -R $GOPATH/pkg/mod/k8s.io/apimachinery@$KUBE_VERSION $GOPATH/src/k8s.io/apimachinery
RUN chmod +x $GOPATH/src/k8s.io/code-generator/generate-groups.sh

WORKDIR $GOPATH/src/k8s.io/code-generator

code-gen.sh 脚本内容如下:

#!/bin/bash -e

set -e -o pipefail

PROJECT_MODULE="mini-k8s-proxy"
IMAGE_NAME="kubernetes-codegen:latest"

echo "Building codegen Docker image..."
docker build --build-arg KUBE_VERSION=v0.20.2 -f "./script/codegen.Dockerfile" \\
            -t "${IMAGE_NAME}" \\
            "."

cmd="/go/src/k8s.io/code-generator/generate-groups.sh all \\
    ${PROJECT_MODULE}/pkg/generated \\
    ${PROJECT_MODULE}/pkg/apis \\
    miniproxy:v1alpha1 \\
    --go-header-file=/go/src/${PROJECT_MODULE}/script/boilerplate.go.txt"

echo "Generating clientSet code ..."
docker run --rm \\
           -v "$(pwd):/go/src/${PROJECT_MODULE}" \\
           -w "/go/src/${PROJECT_MODULE}" \\
           "${IMAGE_NAME}" $cmd

执行脚本生成相关代码:

$ ./script/code-gen.sh

......
Generating clientSet code ...
Generating deepcopy funcs
Generating clientset for miniproxy:v1alpha1 at mini-k8s-proxy/pkg/generated/clientset
Generating listers for miniproxy:v1alpha1 at mini-k8s-proxy/pkg/generated/listers
Generating informers for miniproxy:v1alpha1 at mini-k8s-proxy/pkg/generated/informers

实现业务逻辑

由于业务较简单,我们直接在 main.go 完成业务逻辑,贴上代码:

type ProxyRouteSpec struct {
 V map[string]v1alpha1.ProxyRouteSpec
 sync.RWMutex
}

var prs = &ProxyRouteSpec{
 V: make(map[string]v1alpha1.ProxyRouteSpec, 0),
}

type resourceEventHandler struct {
 Ev chan<- interface{}
}

func (reh *resourceEventHandler) OnAdd(obj interface{}) {
 eventHandlerFunc(reh.Ev, obj)
}

func (reh *resourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
 eventHandlerFunc(reh.Ev, newObj)
}

func (reh *resourceEventHandler) OnDelete(obj interface{}) {
 eventHandlerFunc(reh.Ev, obj)
}

func eventHandlerFunc(events chan<- interface{}, obj interface{}) {
 select {
 case events <- obj:
 default:
 }
}

func main() {
 ctx := context.Background()

 eventCh := make(chan interface{}, 1)
 // 采用缓冲大小为 1 的通道方式来处理 CRD 事件
 eventHandler := &resourceEventHandler{Ev: eventCh}

 // 作为测试,可以直接使用 kubeconfig 连接 k8s,实际部署使用 InClusterConfig 模式
 //cfg, err := clientcmd.BuildConfigFromFlags("", "tmp/config")
 cfg, err := rest.InClusterConfig()
 if err != nil {
  panic(err)
 }
 client, err := clientset.NewForConfig(cfg)
 if err != nil {
  panic(err)
 }
 // 构建 k8s Crd Informer 实例
 factoryCrd := externalversions.NewSharedInformerFactoryWithOptions(
  client,
  10*time.Minute,
 )
 // 注册 Informer 事件处理
 factoryCrd.Miniproxy().V1alpha1().ProxyRoutes().Informer().AddEventHandler(eventHandler)
 // 启动 Informer
 factoryCrd.Start(ctx.Done())
 // 等待首次缓存同步
 for t, ok := range factoryCrd.WaitForCacheSync(ctx.Done()) {
  if !ok {
   panic(fmt.Errorf("timed out waiting for controller caches to sync %s", t.String()))
  }
 }
 go startServer()

 for {
  select {
  case _, ok := <-eventCh:
   if !ok {
    continue
   }
   // 从 Lister 缓存获取 CRD 资源对象
   proxyRoutes, err := factoryCrd.Miniproxy().V1alpha1().ProxyRoutes().Lister().List(labels.Everything())
   if err != nil {
    log.Println(err.Error())
    continue
   }
   // 清空本地缓存并重新放入
   prs.Lock()
   prs.V = make(map[string]v1alpha1.ProxyRouteSpec, 0)
   for _, proxyRoute := range proxyRoutes {
    fmt.Printf("%+v\\n", proxyRoute)
    prs.V[proxyRoute.Spec.Host] = proxyRoute.Spec
   }
   prs.Unlock()
  }
 }
}

原理比较粗暴,通过 Informer — Lister 机制监听 CRD 资源的变化,并将资源对象存入本地 map 缓存中。

继续添加代理转发逻辑:

func startServer() {
 gin.SetMode(gin.ReleaseMode)
 r := gin.Default()
 r.Any("/*any", handler)
 log.Fatalln(r.Run(":80"))
}

func handler(c *gin.Context) {
 prs.RLock()
 defer prs.RUnlock()
 if proxyRouteSpec, ok := prs.V[c.Request.Host]; ok {
  u := ""
  if proxyRouteSpec.Scheme {
   u += "https://"
  } else {
   u += "http://"
  }
  if proxyRouteSpec.Namespace != "" {
   u += proxyRouteSpec.ServiceName + "." + proxyRouteSpec.Namespace
  } else {
   u += proxyRouteSpec.ServiceName
  }
  if proxyRouteSpec.Port != 0 {
   u += fmt.Sprintf(":%d", proxyRouteSpec.Port)
  }
  log.Println("代理地址: ", u)
  proxyUrl, err := url.Parse(u)
  if err != nil {
   c.AbortWithStatus(http.StatusInternalServerError)
  }
  proxyServer(c, proxyUrl)
 } else {
  c.String(http.StatusNotFound, "404")
 }
}

// 代理转发
func proxyServer(c *gin.Context, proxyUrl *url.URL) {
 proxy := &httputil.ReverseProxy{
  Director: func(outReq *http.Request) {
   u := outReq.URL
   outReq.URL = proxyUrl
   if outReq.RequestURI != "" {
    parsedURL, err := url.ParseRequestURI(outReq.RequestURI)
    if err == nil {
     u = parsedURL
    }
   }

   outReq.URL.Path = u.Path
   outReq.URL.RawPath = u.RawPath
   outReq.URL.RawQuery = u.RawQuery
   outReq.RequestURI = "" // Outgoing request should not have RequestURI

   outReq.Proto = "HTTP/1.1"
   outReq.ProtoMajor = 1
   outReq.ProtoMinor = 1

   if _, ok := outReq.Header["User-Agent"]; !ok {
    outReq.Header.Set("User-Agent", "")
   }

   // Even if the websocket RFC says that headers should be case-insensitive,
   // some servers need Sec-WebSocket-Key, Sec-WebSocket-Extensions, Sec-WebSocket-Accept,
   // Sec-WebSocket-Protocol and Sec-WebSocket-Version to be case-sensitive.
   // https://tools.ietf.org/html/rfc6455#page-20
   outReq.Header["Sec-WebSocket-Key"] = outReq.Header["Sec-Websocket-Key"]
   outReq.Header["Sec-WebSocket-Extensions"] = outReq.Header["Sec-Websocket-Extensions"]
   outReq.Header["Sec-WebSocket-Accept"] = outReq.Header["Sec-Websocket-Accept"]
   outReq.Header["Sec-WebSocket-Protocol"] = outReq.Header["Sec-Websocket-Protocol"]
   outReq.Header["Sec-WebSocket-Version"] = outReq.Header["Sec-Websocket-Version"]
   delete(outReq.Header, "Sec-Websocket-Key")
   delete(outReq.Header, "Sec-Websocket-Extensions")
   delete(outReq.Header, "Sec-Websocket-Accept")
   delete(outReq.Header, "Sec-Websocket-Protocol")
   delete(outReq.Header, "Sec-Websocket-Version")
  },
  Transport: &http.Transport{
   TLSClientConfig: &tls.Config{
    InsecureSkipVerify: true,
   },
  },
  ErrorHandler: func(w http.ResponseWriter, request *http.Request, err error) {
   statusCode := http.StatusInternalServerError
   w.WriteHeader(statusCode)
   w.Write([]byte(http.StatusText(statusCode)))
  },
 }
 proxy.ServeHTTP(c.Writer, c.Request)
}

每次请求连接会从本地缓存读取配置,判断是否匹配,若匹配则转发代理到配置的服务中去。

部署

为了方便测试,我已经编译好镜像上传到 Docker Hub 上,所以大家可以直接使用下面的 yaml 部署:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: proxyroutes.miniproxy.togettoyou.com
spec:
  group: miniproxy.togettoyou.com
  names:
    kind: ProxyRoute
    shortNames:
      - pr
    plural: proxyroutes
    singular: proxyroute
  scope: Namespaced
  versions:
    - name: v1alpha1
      served: true
      storage: true
      schema:
        openAPIV3Schema:
          type: object
          properties:
            spec:
              type: object
              properties:
                host:
                  type: string
                serviceName:
                  type: string
                namespace:
                  type: string
                port:
                  type: integer
                scheme:
                  type: boolean
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mini-k8s-proxy
spec:
  selector:
    matchLabels:
      app: mini-k8s-proxy
  replicas: 1
  template:
    metadata:
      labels:
        app: mini-k8s-proxy
    spec:
      containers:
        - name: mini-k8s-proxy
          image: togettoyou/mini-k8s-proxy:latest
          ports:
            - containerPort: 80

---
apiVersion: v1
kind: Service
metadata:
  name: mini-k8s-proxy-service
spec:
  ports:
    - port: 80
      targetPort: 80
  selector:
    app: mini-k8s-proxy
  type: NodePort

部署:

$ kubectl apply -f mini-k8s-proxy.yaml
customresourcedefinition.apiextensions.k8s.io/proxyroutes.miniproxy.togettoyou.com created
deployment.apps/mini-k8s-proxy created
service/mini-k8s-proxy-service created

$ kubectl get svc
NAME                     TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)        AGE
mini-k8s-proxy-service   NodePort    10.111.139.14   <none>        80:32112/TCP   32s

访问集群域名:32112 ,看到 404 的话,恭喜部署成功。

验证使用 ProxyRoute

现在我有一个名称为 test-service 的 service 处于 testns 命名空间下,容器内部端口为 80(是一个 nginx 服务)

$ kubectl get svc -n testns
NAME           TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)   AGE
test-service   ClusterIP   10.97.89.250   <none>        80/TCP    2s

我想要当请求 host 为 test.togettoyou.com:32112 的流量请求过来时,可以代理转发到 test-service 上,怎么做呢

按照心里预期,创建一个 ProxyRoute 资源对象:

apiVersion: miniproxy.togettoyou.com/v1alpha1
kind: ProxyRoute
metadata:
  name: test-proxyroute
spec:
  host: test.togettoyou.com:32112
  serviceName: test-service
  namespace: testns
  port: 80
  scheme: false

浏览器访问 test.togettoyou.com:32112 ,神奇的事情就会发生了

image.png

总结

本文通过 CRD + Controller 实现了一个简易的 K8S 代理转发工具,相关代码均上传到了 Github(https://github.com/togettoyou/mini-k8s-proxy)

实现思路来自 Traefik,强烈推荐

最后,感谢您的阅读!

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
利用 CRD 实现一个 mini-k8s-proxy
实现一个可以通过配置 host 拦截到匹配的请求域名,将流量代理转发到具体的 service 中(通过配置 serviceName,namespace,port...
<<上一篇
下一篇>>