1. 前言
最近在作业调度系统上开发一个 feature,把离线作业放到容器里面跑。
自然而然的,需要在代码里使用 K8S 的 Java 客户端来与作业进行交互。
在技术选型的时候,发现了两个 Kubenetes 的 Java 客户端:
public class Example {
public static void main(String[] args) throws IOException, ApiException{
ApiClient client = Config.defaultClient();
Configuration.setDefaultApiClient(client);
CoreV1Api api = new CoreV1Api();
V1PodList list = api.listPodForAllNamespaces(null, null, null, null, null, null, null, null, null);
for (V1Pod item : list.getItems()) {
System.out.println(item.getMetadata().getName());
}
}
}
这是官方提供的一个 demo,可以看到 listPodForAllNamespaces
这个调用传递的参数列表,基本可以认为是通过某种协议生成的代码。
public class ListExamples {
private static final Logger logger = LoggerFactory.getLogger(ListExamples.class);
public static void main(String[] args) {
String master = "https://192.168.42.20:8443/";
Config config = new ConfigBuilder().withMasterUrl(master).build();
try (final KubernetesClient client = new DefaultKubernetesClient(config)) {
System.out.println(client.pods().list());
} catch (KubernetesClientException e) {
logger.error(e.getMessage(), e);
}
}
}
所以在选型的时候选择了第二个客户端,这也是踩坑的开始。
2. 场景描述
实现在 K8S 运行作业的 feature 的过程中,遇到了这样一个场景:
服务进行重启之后,需要从 K8S 的 API Server 中恢复每一个作业的状态。
作业是否已经创建了?
作业是否正在运行?
作业已经跑完了?
所以第一件事应该是根据 Job 的名字去 K8S 查询 Job 的状态。写下了如下代码:
public void jobTest() {
final Job job = this.client.batch().jobs().withName("demo-data").fromServer().get();
System.out.println(job);
}
get
方法的具体内容:
public T get() {
try {
T answer = getMandatory();
if (answer instanceof HasMetadata) {
HasMetadata hasMetadata = (HasMetadata) answer;
updateApiVersion(hasMetadata);
} else if (answer instanceof KubernetesResourceList) {
KubernetesResourceList list = (KubernetesResourceList) answer;
updateApiVersion(list);
}
return answer;
} catch (KubernetesClientException e) {
if (e.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
throw e;
}
return null;
}
}
get
方法应该返回一个 null
值。与此相对的,还有一个 require
方法:
public T require() throws ResourceNotFoundException {
try {
T answer = getMandatory();
if (answer == null) {
throw new ResourceNotFoundException("The resource you request doesn't exist or couldn't be fetched.");
}
if (answer instanceof HasMetadata) {
HasMetadata hasMetadata = (HasMetadata) answer;
updateApiVersion(hasMetadata);
} else if (answer instanceof KubernetesResourceList) {
KubernetesResourceList list = (KubernetesResourceList) answer;
updateApiVersion(list);
}
return answer;
} catch (KubernetesClientException e) {
if (e.getCode() != HttpURLConnection.HTTP_NOT_FOUND) {
throw e;
}
throw new ResourceNotFoundException("Resource not found : " + e.getMessage());
}
}
如果不存在则会抛出一个 ResourceNotFoundException
。
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://master01.dev.rack.xxxxx.com:6443/apis/batch/v1/namespaces/ares-worker/jobs/demo-data. Message: Forbidden! User ares-worker doesn't have permission. jobs.extensions "demo-data" is forbidden: User "system:serviceaccount:ares-worker:ares-worker" cannot get resource "jobs" in API group "extensions" in the namespace "ares-worker".
Response 的 code 竟然是 403!错误提示的内容表示 cannot get resource "jobs" in API group "extensions"
。
但是从 URL 来看使用的的确是 batch
的 API group,这个错误信息不符合事实啊!
3. 排查过程
除了问题,第一直觉应该是客户端的版本引用有问题。所以首先看一下 K8S 的版本:
kubectl version
Client Version: version.Info{Major:"1", Minor:"14", GitVersion:"v1.14.7", GitCommit:"8fca2ec50a6133511b771a11559e24191b1aa2b4", GitTreeState:"clean", BuildDate:"2019-09-18T14:47:22Z", GoVersion:"go1.12.9", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"15", GitVersion:"v1.15.3", GitCommit:"2d3c76f9091b6bec110a5e63777c332469e0cba2", GitTreeState:"clean", BuildDate:"2019-08-19T11:05:50Z", GoVersion:"go1.12.9", Compiler:"gc", Platform:"linux/amd64"}
可以看到 Server 端的版本为 v1.15.3
,而客户端的版本是 4.6.1
。从 README 来看版本是完美匹配的啊!
无奈,只好开始断点 debug 大法,终于让我在 这个类里面找到了答案:
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
Response response = chain.proceed(request);
if (!response.isSuccessful() && responseCodeToTransformations.keySet().contains(response.code())) {
String url = request.url().toString();
Matcher matcher = getMatcher(url);
ResourceKey key = getKey(matcher);
ResourceKey target = responseCodeToTransformations.get(response.code()).get(key);
if (target != null) {
response.close(); // At this point, we know we won't reuse or return the response; so close it to avoid a connection leak.
String newUrl = new StringBuilder(url)
.replace(matcher.start(API_VERSION), matcher.end(API_VERSION), target.version) // Order matters: We need to substitute right to left, so that former substitution don't affect the indexes of later.
.replace(matcher.start(API_GROUP), matcher.end(API_GROUP), target.group)
.toString();
Request.Builder newRequest = request.newBuilder()
.url(newUrl);
Buffer buffer = new Buffer();
if (request.body() != null && !request.method().equalsIgnoreCase(PATCH)) {
request.body().writeTo(buffer);
Object object = Serialization.unmarshal(buffer.inputStream());
if (object instanceof HasMetadata) {
HasMetadata h = (HasMetadata) object;
h.setApiVersion(target.group + "/" + target.version);
newRequest = newRequest.method(request.method(), RequestBody.create(OperationSupport.JSON, Serialization.asJson(h)));
}
}
return chain.proceed(newRequest.build());
}
}
return response;
}
简单来说,如果 Kubenetes API Server 响应的 code 不是 200 并且在代码硬编码的某种配置中,会使用一个新的 URL 来请求。目的是实现向下的版本兼容性!
而这个新的 URL 采用了老版本的接口访问资源,所在的 API group 即为 extensions
,导致返回了上面的错误信息。
4. 解决方案
查到了问题所在,解决就比较简单了:
-
-
在业务代码中通过匹配特定异常信息进行识别。