记一个 Fabric8 K8S Client 的坑

1. 前言

最近在作业调度系统上开发一个 feature,把离线作业放到容器里面跑。

自然而然的,需要在代码里使用 K8S 的 Java 客户端来与作业进行交互。

在技术选型的时候,发现了两个 Kubenetes 的 Java 客户端:

第一个是官方推出的 Java 客户端,但是 API 的使用体验巨差。随便举个例子:

 public class Example {
     public static void main(String[] args) throws IOException, ApiException{
         ApiClient client = Config.defaultClient();
         Configuration.setDefaultApiClient(client);
 
         CoreV1Api api = new CoreV1Api();
         V1PodList list = api.listPodForAllNamespaces(null, null, null, null, null, null, null, null, null);
         for (V1Pod item : list.getItems()) {
             System.out.println(item.getMetadata().getName());
         }
     }
 }

这是官方提供的一个 demo,可以看到 listPodForAllNamespaces 这个调用传递的参数列表,基本可以认为是通过某种协议生成的代码。

第二个客户端虽然非官方,但是 API 对于用户的友好性与官方库相比简直是天差地别。同样的代码:

 public class ListExamples {
 
   private static final Logger logger = LoggerFactory.getLogger(ListExamples.class);
 
   public static void main(String[] args) {
     String master = "https://192.168.42.20:8443/";
 
     Config config = new ConfigBuilder().withMasterUrl(master).build();
     try (final KubernetesClient client = new DefaultKubernetesClient(config)) {
 
       System.out.println(client.pods().list());
     } catch (KubernetesClientException e) {
       logger.error(e.getMessage(), e);
     }
   }
 
 }

所以在选型的时候选择了第二个客户端,这也是踩坑的开始。

2. 场景描述

实现在 K8S 运行作业的 feature 的过程中,遇到了这样一个场景:

服务进行重启之后,需要从 K8S 的 API Server 中恢复每一个作业的状态。

  • 作业是否已经创建了?

  • 作业是否正在运行?

  • 作业已经跑完了?

所以第一件事应该是根据 Job 的名字去 K8S 查询 Job 的状态。写下了如下代码:

   @Test
   public void jobTest() {
     final Job job = this.client.batch().jobs().withName("demo-data").fromServer().get();
     System.out.println(job);
   }

简单来看看最后 get 方法的具体内容:

   @Override
   public T get() {
     try {
       T answer = getMandatory();
       if (answer instanceof HasMetadata) {
         HasMetadata hasMetadata = (HasMetadata) answer;
         updateApiVersion(hasMetadata);
       } else if (answer instanceof KubernetesResourceList) {
         KubernetesResourceList list = (KubernetesResourceList) answer;
         updateApiVersion(list);
       }
       return answer;
     } catch (KubernetesClientException e) {
       if (e.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
         throw e;
       }
       return null;
     }
   }

从代码的内容来看,如果资源不存在 get 方法应该返回一个 null 值。与此相对的,还有一个 require 方法:

   @Override
   public T require() throws ResourceNotFoundException {
     try {
       T answer = getMandatory();
       if (answer == null) {
         throw new ResourceNotFoundException("The resource you request doesn't exist or couldn't be fetched.");
       }
       if (answer instanceof HasMetadata) {
         HasMetadata hasMetadata = (HasMetadata) answer;
         updateApiVersion(hasMetadata);
       } else if (answer instanceof KubernetesResourceList) {
         KubernetesResourceList list = (KubernetesResourceList) answer;
         updateApiVersion(list);
       }
       return answer;
     } catch (KubernetesClientException e) {
       if (e.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
         throw e;
       }
       throw new ResourceNotFoundException("Resource not found : " + e.getMessage());
     }
   }

如果不存在则会抛出一个 ResourceNotFoundException

 io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://master01.dev.rack.xxxxx.com:6443/apis/batch/v1/namespaces/ares-worker/jobs/demo-data. Message: Forbidden! User ares-worker doesn't have permission. jobs.extensions "demo-data" is forbidden: User "system:serviceaccount:ares-worker:ares-worker" cannot get resource "jobs" in API group "extensions" in the namespace "ares-worker".

打个断点,可以看到如下情况:

image-20191023171256096

Response 的 code 竟然是 403!错误提示的内容表示 cannot get resource "jobs" in API group "extensions"

但是从 URL 来看使用的的确是 batch 的 API group,这个错误信息不符合事实啊!

3. 排查过程

除了问题,第一直觉应该是客户端的版本引用有问题。所以首先看一下 K8S 的版本:

 kubectl version
 Client Version: version.Info{Major:"1", Minor:"14", GitVersion:"v1.14.7", GitCommit:"8fca2ec50a6133511b771a11559e24191b1aa2b4", GitTreeState:"clean", BuildDate:"2019-09-18T14:47:22Z", GoVersion:"go1.12.9", Compiler:"gc", Platform:"darwin/amd64"}
 Server Version: version.Info{Major:"1", Minor:"15", GitVersion:"v1.15.3", GitCommit:"2d3c76f9091b6bec110a5e63777c332469e0cba2", GitTreeState:"clean", BuildDate:"2019-08-19T11:05:50Z", GoVersion:"go1.12.9", Compiler:"gc", Platform:"linux/amd64"}

可以看到 Server 端的版本为 v1.15.3,而客户端的版本是 4.6.1。从 README 来看版本是完美匹配的啊!

那么是不是集群侧的权限设置有问题呢?答案是否定的,因为同样的代码在查询的确存在的 Job 时的确能正确返回。

无奈,只好开始断点 debug 大法,终于让我在 io.fabric8.kubernetes.client.utils.BackwardsCompatibilityInterceptor 这个类里面找到了答案:

   public Response intercept(Chain chain) throws IOException {
     Request request = chain.request();
     Response response = chain.proceed(request);
     if (!response.isSuccessful() && responseCodeToTransformations.keySet().contains(response.code())) {
       String url = request.url().toString();
       Matcher matcher = getMatcher(url);
       ResourceKey key = getKey(matcher);
       ResourceKey target = responseCodeToTransformations.get(response.code()).get(key);
       if (target != null) {
         response.close(); // At this point, we know we won't reuse or return the response; so close it to avoid a connection leak.
         String newUrl = new StringBuilder(url)
             .replace(matcher.start(API_VERSION), matcher.end(API_VERSION), target.version) // Order matters: We need to substitute right to left, so that former substitution don't affect the indexes of later.
             .replace(matcher.start(API_GROUP), matcher.end(API_GROUP), target.group)
             .toString();
 
         Request.Builder newRequest = request.newBuilder()
           .url(newUrl);
 
         Buffer buffer = new Buffer();
         if (request.body() != null && !request.method().equalsIgnoreCase(PATCH)) {
           request.body().writeTo(buffer);
             Object object = Serialization.unmarshal(buffer.inputStream());
           if (object instanceof HasMetadata) {
             HasMetadata h = (HasMetadata) object;
             h.setApiVersion(target.group + "/" + target.version);
             newRequest = newRequest.method(request.method(), RequestBody.create(OperationSupport.JSON, Serialization.asJson(h)));
           }
         }
 
         return chain.proceed(newRequest.build());
       }
     }
     return response;
   }

简单来说,如果 Kubenetes API Server 响应的 code 不是 200 并且在代码硬编码的某种配置中,会使用一个新的 URL 来请求。目的是实现向下的版本兼容性!

而这个新的 URL 采用了老版本的接口访问资源,所在的 API group 即为 extensions,导致返回了上面的错误信息。

4. 解决方案

查到了问题所在,解决就比较简单了:

  1. 自己构建客户端使用到 OkHttpClient,去掉这个拦截器。

  2. 在业务代码中通过匹配特定异常信息进行识别。