问题描述
我有用pytest编写的端到端测试,该测试在命名空间foo
的Kubernetes集群上运行。现在,我想在测试中添加简单的混乱工程,以检查服务的弹性。为此,我只需要删除foo
中的特定容器-由于K8s重新启动了相应的服务,因此这模拟了该服务暂时无法访问。
使用Python删除当前命名空间中的特定容器的简单方法是什么?
到目前为止,我已经尝试过:
由于我没有在https://github.com/kubernetes-client/python/tree/master/examples中找到合适的示例,但是使用pod的示例看起来很复杂,
我看着kubetest
,它看起来非常简单而优雅。
我想使用kube
固定装置,然后执行以下操作:
pods = kube.get_pods()
for pod in pods:
if can_be_temporarily_unreachable(pod):
kube.delete(pod)
我认为使用参数--in-cluster
调用pytest会告诉kubetest
使用当前的群集设置,而不创建新的K8s资源。
但是,kubetest
希望为每个使用kube
固定装置的测试用例创建一个新的名称空间,我不希望这样做。
有没有办法告诉kubetest
不要创建新的名称空间,而是在当前名称空间中执行所有操作?
尽管kubetest
看起来非常简单优雅,但我也很高兴使用其他解决方案。
一个简单的解决方案,只需很少的时间和维护,并且不会使测试复杂化(读),真是棒极了。
解决方法
您可以使用delete_namespaced_pod
(来自CoreV1Api)删除命名空间中的特定容器。
这里是一个例子:
from kubernetes import client,config
from kubernetes.client.rest import ApiException
config.load_incluster_config() # or config.load_kube_config()
configuration = client.Configuration()
with client.ApiClient(configuration) as api_client:
api_instance = client.CoreV1Api(api_client)
namespace = 'kube-system' # str | see @Max Lobur's answer on how to get this
name = 'kindnet-mpkvf' # str | Pod name,e.g. via api_instance.list_namespaced_pod(namespace)
try:
api_response = api_instance.delete_namespaced_pod(name,namespace)
print(api_response)
except ApiException as e:
print("Exception when calling CoreV1Api->delete_namespaced_pod: %s\n" % e)
,
Tibebes对现有答案的延续。 M
如何找出代码本身在其中运行的名称空间?
有两种方法可以做到这一点:
- 使用Downward API将Pod名称空间传递给Pod环境变量:
- 使用模板引擎/头盔将环境变量名称空间另外传递给Pod。例:
env: - name: CURRENT_NAMESPACE value: "{{ .Values.namespace }}"