背景
由于公司内部所有服务都是跑在阿里云 k8s 上的,然后 dubbo 提供者默认向注册中心上报的 IP 都是Pod IP
,这意味着在 k8s 集群外的网络环境是调用不了 dubbo 服务的,如果本地开发需要访问 k8s 内的 dubbo 提供者服务的话,需要手动把服务暴露到外网,我们的做法是针对每一个提供者服务暴露一个SLB IP+自定义端口
,并且通过 dubbo 提供的DUBBO_IP_TO_REGISTRY
和DUBBO_PORT_TO_REGISTRY
环境变量来把对应的SLB IP+自定义端口
注册到注册中心里,这样就实现了本地网络和 k8s dubbo 服务的打通,但是这种方式管理起来非常麻烦,每个服务都得自定义一个端口,而且每个服务之间端口还不能冲突,当服务多起来之后非常难以管理。
于是我就在想能不能像nginx ingress
一样实现一个七层代理+虚拟域名
来复用一个端口,通过目标 dubbo 提供者的application.name
来做对应的转发,这样的话所有的服务只需要注册同一个SLB IP+端口
就可以了,大大的提升便利性,一方调研之后发现可行就开撸了!
项目已开源:https://github.com/monkeyWie/dubbo-ingress-controller
技术预研
思路
- 首先 dubbo RPC 调用默认是走的
dubbo协议
,所以我需要先去看看协议里有没有可以利用做转发的报文信息,就是寻找类似于 HTTP 协议里的 Host 请求头,如果有的话就可以根据此信息做反向代理
和虚拟域名
的转发,在此基础之上实现一个类似nginx
的dubbo网关
。 - 第二步就是要实现
dubbo ingress controller
,通过 k8s ingress 的 watcher 机制动态的更新dubbo 网关
的虚拟域名转发配置,然后所有的提供者服务都由此服务同一转发,并且上报到注册中心的地址也统一为此服务的地址。
架构图
dubbo 协议
先上一个官方的协议图:
可以看到 dubbo 协议的 header 是固定的16个字节
,里面并没有类似于 HTTP Header 的可扩展字段,也没有携带目标提供者的application.name
字段,于是我向官方提了个issue,官方的答复是通过消费者自定义Filter
来将目标提供者的application.name
放到attachments
里,这里不得不吐槽下 dubbo 协议,扩展字段竟然是放在body
里,如果要实现转发需要把请求报文全部解析完才能拿到想要报文,不过问题不大,因为主要是做给开发环境用的,这一步勉强可以实现。
k8s ingress
k8s ingress 是为 HTTP 而生的,但是里面的字段够用了,来看一段 ingress 配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| apiVersion: extensions/v1beta1 kind: Ingress metadata: name: user-rpc-dubbo annotations: kubernetes.io/ingress.class: "dubbo" spec: rules: - host: user-rpc http: paths: - backend: serviceName: user-rpc servicePort: 20880 path: /
|
配置和 http 一样通过host
来做转发规则,但是host
配置的是目标提供者的application.name
,后端服务是目标提供者对应的service
,这里有一个比较特殊的是使用了一个kubernetes.io/ingress.class
注解,这个注解可以指定此ingress
对哪个ingress controller
生效,后面我们的dubbo ingress controller
就只会解析注解值为dubbo
的 ingress 配置。
开发
前面的技术预研一切顺利,接着就进入开发阶段了。
消费者自定义 Filter
前面有提到如果请求里要携带目标提供者的application.name
,需要消费者自定义Filter
,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12
| @Activate(group = CONSUMER) public class AddTargetFilter implements Filter {
@Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { String targetApplication = StringUtils.isBlank(invoker.getUrl().getRemoteApplication()) ? invoker.getUrl().getGroup() : invoker.getUrl().getRemoteApplication(); invocation.setAttachment("target-application", targetApplication); return invoker.invoke(invocation); } }
|
这里又要吐槽一下,dubbo 消费者首次访问时会发起一个获取 metadata 的请求,这个请求通过invoker.getUrl().getRemoteApplication()
是拿不到值的,通过invoker.getUrl().getGroup()
才能拿到。
dubbo 网关
这里就要开发一个类似nginx
的dubbo网关
,并实现七层代理和虚拟域名转发,编程语言直接选择了 go,首先 go 做网络开发心智负担低,另外有个 dubbo-go 项目,可以直接利用里面的解码器,然后 go 有原生的 k8s sdk 支持,简直完美!
思路就是开启一个TCP Server
,然后解析 dubbo 请求的报文,把attachment
里的target-application
属性拿到,再反向代理到真正的 dubbo 提供者服务上,核心代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| routingTable := map[string]string{ "user-rpc": "user-rpc:20880", "pay-rpc": "pay-rpc:20880", }
listener, err := net.Listen("tcp", ":20880") if err != nil { return err } for { clientConn, err := listener.Accept() if err != nil { logger.Errorf("accept error:%v", err) continue } go func() { defer clientConn.Close() var proxyConn net.Conn defer func() { if proxyConn != nil { proxyConn.Close() } }() scanner := bufio.NewScanner(clientConn) scanner.Split(split) for scanner.Scan() { data := scanner.Bytes() buf := bytes.NewBuffer(data) pkg := impl.NewDubboPackage(buf) pkg.Unmarshal() body := pkg.Body.(map[string]interface{}) attachments := body["attachments"].(map[string]interface{}) target := attachments["target-application"].(string) if proxyConn == nil { host := routingTable[target] proxyConn, _ = net.Dial("tcp", host) go func() { io.Copy(clientConn, proxyConn) }() } proxyConn.Write(data) } }() }
func split(data []byte, atEOF bool) (advance int, token []byte, err error) { if atEOF && len(data) == 0 { return 0, nil, nil }
buf := bytes.NewBuffer(data) pkg := impl.NewDubboPackage(buf) err = pkg.ReadHeader() if err != nil { if errors.Is(err, hessian.ErrHeaderNotEnough) || errors.Is(err, hessian.ErrBodyNotEnough) { return 0, nil, nil } return 0, nil, err } if !pkg.IsRequest() { return 0, nil, errors.New("not request") } requestLen := impl.HEADER_LENGTH + pkg.Header.BodyLen if len(data) < requestLen { return 0, nil, nil } return requestLen, data[0:requestLen], nil }
|
dubbo ingress controller 实现
前面已经实现了一个dubbo网关
,但是里面的虚拟域名转发配置(routingTable
)还是写死在代码里的,现在要做的就是当检测到k8s ingress
有更新时,动态的更新这个配置就可以了。
首先先简单的说明下ingress controller
的原理,拿我们常用的nginx ingress controller
为例,它也是一样通过监听k8s ingress
资源变动,然后动态的生成nginx.conf
文件,当发现配置发生了改变时,触发nginx -s reload
重新加载配置文件。
里面用到的核心技术就是informers,利用它来监听k8s资源
的变动,示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| cfg, err := rest.InClusterConfig() if err != nil { logger.Fatal(err) }
client, err := kubernetes.NewForConfig(cfg) if err != nil { logger.Fatal(err) }
factory := informers.NewSharedInformerFactory(client, time.Minute) handler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { }, UpdateFunc: func(oldObj, newObj interface{}) { }, DeleteFunc: func(obj interface{}) { }, }
informer := factory.Extensions().V1beta1().Ingresses().Informer() informer.AddEventHandler(handler) informer.Run(ctx.Done())
|
通过实现上面的三个事件来动态的更新转发配置,每个事件都会携带对应的Ingress
对象信息过来,然后进行对应的处理即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| ingress, ok := obj.(*v1beta12.Ingress) if ok { ingressClass := ingress.Annotations["kubernetes.io/ingress.class"] if ingressClass == "dubbo" && len(ingress.Spec.Rules) > 0 { rule := ingress.Spec.Rules[0] if len(rule.HTTP.Paths) > 0 { backend := rule.HTTP.Paths[0].Backend host := rule.Host service := fmt.Sprintf("%s:%d", backend.ServiceName+"."+ingress.Namespace, backend.ServicePort.IntVal) notify(host,service) } } }
|
docker 镜像提供
k8s 之上所有的服务都需要跑在容器里的,这里也不例外,需要把dubbo ingress controller
构建成 docker 镜像,这里通过两阶段构建优化,来减小镜像体积:
1 2 3 4 5 6 7 8 9 10 11 12 13
| FROM golang:1.17.3 AS builder WORKDIR /src COPY . . ENV GOPROXY https://goproxy.cn ENV CGO_ENABLED=0 RUN go build -ldflags "-w -s" -o main cmd/main.go
FROM debian AS runner ENV TZ=Asia/shanghai WORKDIR /app COPY --from=builder /src/main . RUN chmod +x ./main ENTRYPOINT ["./main"]
|
yaml 模板提供
由于要在集群内访问 k8s API,需要给 Pod 进行授权,通过K8S rbac
进行授权,并以Deployment
类型服务进行部署,最终模板如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| apiVersion: v1 kind: ServiceAccount metadata: name: dubbo-ingress-controller namespace: default
--- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1beta1 metadata: name: dubbo-ingress-controller rules: - apiGroups: - extensions resources: - ingresses verbs: - get - list - watch
--- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1beta1 metadata: name: dubbo-ingress-controller roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: dubbo-ingress-controller subjects: - kind: ServiceAccount name: dubbo-ingress-controller namespace: default
--- apiVersion: apps/v1 kind: Deployment metadata: namespace: default name: dubbo-ingress-controller labels: app: dubbo-ingress-controller spec: selector: matchLabels: app: dubbo-ingress-controller template: metadata: labels: app: dubbo-ingress-controller spec: serviceAccountName: dubbo-ingress-controller containers: - name: dubbo-ingress-controller image: liwei2633/dubbo-ingress-controller:0.0.1 ports: - containerPort: 20880
|
后期需要的话可以做成Helm
进行管理。
后记
至此dubbo ingress controller
实现完成,可以说麻雀虽小但是五脏俱全,里面涉及到了dubbo协议
、TCP协议
、七层代理
、k8s ingress
、docker
等等很多内容,这些很多知识都是在云原生
越来越流行的时代需要掌握的,开发完之后感觉受益匪浅。
关于完整的使用教程可以通过github查看。
参考链接: