概览
Python 环境下的 CloudTower SDK,适用于 2.7 和 3.4 及以上版本
安装
- whl- pip install cloudtower_sdk-1.9.0-py2.py3-none-any.whl
- tar.gz- tar xvzf cloudtower-sdk-1.9.0.tar.gz
 cd cloudtower-sdk-1.90.tar.gz
 python setup.py install
- git 源码安装- git clone https://github.com/smartxworks/cloudtower-python-sdk.git
 cd clodtower-python-sdk
 python setup.py install
- git pip 安装- pip install git+https://github.com/smartxworks/cloudtower-python-sdk.git
- pypi 安装- pip install cloudtower-sdk
使用
创建实例
创建 ApiClient 实例
from cloudtower.configuration import Configuration
from cloudtower import ApiClient
# 配置 operation-api endpoint
configuration = Configuration(host="http://192.168.96.133/v2/api")
client = ApiClient(configuration)
如果需要使用 https,可以安装证书,或者忽略证书验证
configuration = Configuration(host="https://192.168.96.133/v2/api")
configuration.verify_ssl = False
client = ApiClient(configuration)
创建对应的 API 实例
根据不同用途的操作创建相关的 API 实例,例如虚拟机相关操作需要创建一个
VmApi。
from cloudtower.api.vm_api import VmApi
vm_api = VmApi(client)
鉴权
from cloudtower.api.user_api import UserApi
from cloudtower.models import UserSource
# 通过 UserApi 中的 login 方法来获得 token。
user_api = UserApi(client)
login_res = user_api.login({
    "username": "your_username",
    "password": "your_password",
    "source": UserSource.LOCAL
})
# 将 token 配置在 configuration.api_key["Authorization"] 中,
# 这样所有使用当前 client 的 api 都会获得鉴权的 token 信息。
configuration.api_key["Authorization"] = login_res.data.token
发送请求
获取资源
vms = vm_api.get_vms({
  "where": {
    "id": "vm_id"
  },
  "first":1,
})
更新资源
资源更新会产生相关的异步任务,当异步任务结束时,代表资源操作完成且数据已更新。
start_res = vm_api.start_vm({
  "where": {
    "id": "stopped_vm_id"
  },
})
可以通过提供的工具方法同步等待异步任务结束
from cloudtower.utils import wait_tasks
try:
 wait_tasks([res.task_id for res in start_res], api_client)
except ApiException as e:
 # 处理错误
else:
 # task完成后的回调
方法参数说明
| 参数名 | 类型 | 是否必须 | 说明 | 
|---|---|---|---|
| ids | list[str] | 是 | 需查询的 task 的 id 列表 | 
| api_client | ApiClient | 是 | 查询所使用的 ApiClient 实例 | 
| interval | int | 否 | 轮询的间隔时间,默认为 5s | 
| timeout | int | 否 | 超时时间,默认为 300s | 
| exit_on_error | bool | 否 | 是否在单个 Task 出错时立即退出,否则则会等待全部 Task 都完成后再退出,默认为 False。 | 
错误说明
| 错误码 | 说明 | 
|---|---|
| 408 | 超时 | 
| 500 | 异步任务内部错误 | 
自定义 header
cloudtower api 支持通过设置 header 中的 content-language 来设置返回信息的语言, 可选值
en-US,zh-CN。默认为en-US。
通过 ApiClient 的 set_default_header 方法
可以通过
ApiClient的set_default_header方法设置默认的 header 信息。
api_client.set_default_header("content_language","en-US")
alert_api = AlertApi(api_client)
# 此时得到的 alerts 中的 message, solution, cause, impact 将被转换为英文描述。
alerts = alert_api.get_alerts(
  {
    "where": {
      "cluster": {
        "id": "cluster_id"
      }
    },
    "first": 100
  },
)
通过设置请求的关键字参数
也可以通过设置请求的关键字参数
content_language来设置返回信息的语言。
from cloudtower.api.alert_api import AlertApi
alert_api = AlertApi(api_client)
# 此时得到的 alerts 中的 message, solution, cause, impact 将被转换为中文描述。
alerts = alert_api.get_alerts(
  {
    "where": {
      "cluster": {
        "id": "cluster_id"
      }
    },
    "first": 100
  },
  content_language="zh-CN"
)
其他
发送异步请求
上述请求的发送都是同步的请求,会堵塞当前进程。如果需要使用异步请求,请在对应请求的关键字参数中加上
async_req=True。 通过返回结果ApplyResult.get()来获取对应的结果。
vms = vm_api.get_vms(
  {
    "where": {
      "id": "vm_id"
    }
  },
  async_req=True
)
print(vms.get()[0].name)
使用完成后销毁 ApiClient 实例
client.close()
场景示例
虚拟机备份
from cloudtower import ApiClient
from cloudtower.api.vm_api import VmApi
from cloudtower.api.vm_snapshot_api import VmSnapshotApi
from cloudtower.api.iscsi_lun_snapshot_api import IscsiLunSnapshotApi
from cloudtower.models import (
    ConsistentType,
    VmToolsStatus
)
from cloudtower.utils import wait_tasks
def create_vm_snapshot(
    api_client: ApiClient,
    target_vm_name: str,
    target_snapshot_name: str,
    consistent_type: ConsistentType
):
    vm_api = VmApi(api_client)
    vm_snapshot_api = VmSnapshotApi(api_client)
    iscsi_lun_snapshot_api = IscsiLunSnapshotApi(api_client)
    # 1. 获取所需备份的虚拟机的信息,这里我们需要vm的id来构建创建snapshot的参数
    vm = vm_api.get_vms({
        "where": {
            "name": target_vm_name
        },
        "first": 1
    })
    # vm 已安装并启动 VMTools 时,consistent_type 可以使用 FILE_SYSTEM_CONSISTENT 代表文件系统一致性快照
    if vm.vm_tools_status != VmToolsStatus.RUNNING and consistent_type == ConsistentType.FILE_SYSTEM_CONSISTENT:
        consistent_type = ConsistentType.CRASH_CONSISTENT
    # 2. 创建虚拟机快照
    snapshots_with_task = vm_snapshot_api.create_vm_snapshot({
        "data": [
            {
                "vm_id": vm.id,
                "name": target_snapshot_name,
                "consistent_type": consistent_type
            }
        ]
    })
    # 3. 等待Task完成
    wait_tasks([snapshots_with_task[0].task_id], api_client)
    # 4. 根据返回的id查询生成的虚拟机快照
    snapshot = vm_snapshot_api.get_vm_snapshots({
        "where": {
            "id": snapshots_with_task.data.id
        }
    })[0]
    # 5. 根据返回的snapshot中的vm_disks包含了快照的虚拟盘信息
    # type 为 DISK 表示对应一个卷,其中会包含一个 snapshot_local_id 则表示该虚拟卷对应的lun快照的 local_id
    # type 为 CD-ROM则代表为被挂载的CD-ROM,不会产生lun快照
    lun_snapshot_ids = []
    for disk in snapshot.vm_disks:
        if disk.type == "DISK":
            lun_snapshot_ids.append(disk.snapshot_local_id)
    lun_snapshots = iscsi_lun_snapshot_api.get_iscsi_lun_snapshots({
        "where": {
            "name_in": lun_snapshot_ids
        }
    })
    return {
        "vm_snapshot": snapshot,
        "lun_snapshots": lun_snapshots
    }
Dashboard 构建
定义工具方法
from functools import reduce
from datetime import datetime, timedelta
from cloudtower import ApiClient
from cloudtower.configuration import Configuration
from cloudtower.models import SeverityEnum, ClusterType, Hypervisor, DiskType, DiskUsageStatus, DiskHealthStatus
from cloudtower.api import VmApi, ClusterApi, AlertApi, HostApi, DiskApi, ClusterSettingsApi, GlobalSettingsApi
api_client = ApiClient(Configuration(host="http://192.168.96.133/v2/api"))
byte_units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"]
hz_units = ["Hz", "KHz", "MHz", "GHz", "THz"]
def format_unit(base: int, units, step=1024):
    if not len(units):
        raise Exception("no unit provided")
    if base <= 0:
        return "0" + units[0]
    for unit in units:
        if base < step:
            return "{:.2f}{}".format(base, unit)
        base /= step
    return "{:.2f}{}".format(base, units[-1])
构建报警信息
def build_alerts(api_client: ApiClient, cluster_ids):
    alert_api = AlertApi(api_client)
    alerts = alert_api.get_alerts({
        "where": {
            "ended": False,
            "cluster": {
                "id_in": cluster_ids
            },
        }
    })
    critial_alerts = [
        alert for alert in alerts if alert.severity == SeverityEnum.CRITICAL]
    notice_alerts = [
        alert for alert in alerts if alert.severity == SeverityEnum.NOTICE]
    info_alerts = [
        alert for alert in alerts if alert.severity == SeverityEnum.INFO]
    return {
        "critical": critial_alerts,
        "notice": notice_alerts,
        "info": info_alerts
    }
构建硬盘信息
这里以机械硬盘为例
def build_hdd_info(api_client: ApiClient, cluster_ids):
    disk_api = DiskApi(api_client)
    disks = disk_api.get_disks({
        "where": {
            "host": {
                "cluster": {
                    "id_in": cluster_ids
                }
            }
        }
    })
    hdd = {
        "healthy": 0,
        "warning": 0,
        "error": 0,
        "total": 0,
    }
    for disk in disks:
        if disk.type == DiskType.HDD:
            if disk.health_status in [DiskHealthStatus.UNHEALTHY, DiskHealthStatus.SUBHEALTHY, DiskHealthStatus.SMART_FAILED]:
                hdd['error'] += 1
            elif disk.usage_status in [DiskUsageStatus.UNMOUNTED, DiskUsageStatus.PARTIAL_MOUNTED]:
                hdd['warning'] += 1
            else:
                hdd['healthy'] += 1
            hdd['total'] += 1
    return hdd
构建性能指标
获取指定集群的 CPU 核数,CPU 频率总数,CPU 使用率,内存总量,内存使用量,存储资源总量,存储资源已使用量,存储资源失效量与存储资源可用量。
def build_metrics(api_client: ApiClient, clusters, cluster_ids):
    result = {}
    host_api = HostApi(api_client)
    hosts = host_api.get_hosts({
        "where": {
            "cluster": {
                "id_in": cluster_ids
            }
        }
    })
    cpu = {
        "total_cpu_cores": 0,
        "total_cpu_hz": 0,
        "used_cpu_hz": 0,
    }
    memory = {
        "total_memory": 0,
        "used_memory": 0,
    }
    storage = {
        "total": 0,
        "used": 0,
        "invalid": 0,
        "available": 0
    }
    for host in hosts:
        cluster = next(
            cluster for cluster in clusters if cluster.id == host.cluster.id)
        if cluster.hypervisor == Hypervisor.ELF:
            memory['total_memory'] += 0 if host.total_memory_bytes is None else host.total_memory_bytes
            memory['used_memory'] += (0 if host.running_pause_vm_memory_bytes is None else host.running_pause_vm_memory_bytes) + \
                (0 if host.os_memory_bytes is None else host.os_memory_bytes)
    for cluster in clusters:
        if cluster.type == ClusterType.SMTX_OS:
            cpu["total_cpu_cores"] += 0 if cluster.total_cpu_cores is None else cluster.total_cpu_cores
            cpu["total_cpu_hz"] += 0 if cluster.total_cpu_hz is None else cluster.total_cpu_hz
            cpu["used_cpu_hz"] += 0 if cluster.used_cpu_hz is None else cluster.used_cpu_hz
            if cluster.hypervisor == Hypervisor.VMWARE:
                memory["total_memory"] += 0 if cluster.total_memory_bytes is None else cluster.total_memory_bytes
                memory["used_memory"] += 0 if cluster.used_memory_bytes is None else cluster.used_memory_bytes
        storage["total"] += 0 if cluster.total_data_capacity is None else cluster.total_data_capacity
        storage["used"] += 0 if cluster.used_data_space is None else cluster.used_data_space
        storage["invalid"] += 0 if cluster.failure_data_space is None else cluster.failure_data_space
    if len([cluster for cluster in clusters if cluster.type != ClusterType.SMTX_ZBS]) > 1:
        cpu["cpu_usage"] = "{:.2f}%".format(
            cpu["used_cpu_hz"] / cpu["total_cpu_hz"])
        cpu["total_cpu_hz"] = format_unit(cpu["total_cpu_hz"], hz_units, 1000)
        cpu["used_cpu_hz"] = format_unit(cpu["used_cpu_hz"], hz_units, 1000)
        result['cpu'] = cpu
        memory["memory_usage"] = "{:.2f}%".format(
            memory["used_memory"] / memory["total_memory"])
        memory["total_memory"] = format_unit(
            memory["total_memory"], byte_units)
        memory["used_memory"] = format_unit(
            memory["used_memory"], byte_units)
        result["memory"] = memory
    storage["available"] = format_unit(
        storage["total"] - storage["used"] - storage["invalid"], byte_units)
    storage["total"] = format_unit(storage["total"], byte_units)
    storage["used"] = format_unit(storage["used"], byte_units)
    storage["invalid"] = format_unit(storage["invalid"], byte_units)
    result["storage"] = storage
    return result
构建 Dashboard
def build_dashboard(api_client: ApiClient, datacenter_id: str = None, cluster_id: str = None):
    result = {}
    cluster_api = ClusterApi(api_client)
    clusters = cluster_api.get_clusters({
        "where": {"id": cluster_id} if cluster_id is not None else {"datacenters_some": {"id": datacenter_id}} if datacenter_id is not None else None
    })
    cluster_ids = [cluster.id for cluster in clusters]
    result["alerts"] = build_alerts(api_client, cluster_ids)
    result["hdd"] = build_hdd_info(api_client, cluster_ids)
    metric = build_metrics(api_client, clusters, cluster_ids)
    if "cpu" in metric:
        result["cpu"] = metric["cpu"]
    if "memory" in metric:
        result["memory"] = metric["memory"]
    if "storage" in metric:
        result["storage"] = metric["storage"]
    return result