JumpServer V2 阿里云资产自动同步方案
一、实施要点
1.1 资产模板配置需替换nodes字段值,可通过数据库assets_node表查询或调用JumpServer API接口获取资产信息
1.2 依赖库版本要求:
aliyun-python-sdk-core==2.13.36
aliyun-python-sdk-ecs==4.24.30
aliyunsdkcore==1.0.3
requests==2.28.2
urllib3==1.26.14
PyMySQL~=1.0.2
二、核心组件实现
1. 阿里云ECS服务模块
# -*- coding: utf-8 -*-
import json
from aliyunsdkcore.client import AcsClient
from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest
from aliyunsdkecs.request.v20140526.DescribeDisksRequest import DescribeDisksRequest
class EcsService:
def __init__(self, region="cn-hangzhou"):
self.region = region
self.access_key = "xxxxxxx"
self.secret_key = "xxxxxxx"
self.client = AcsClient(self.access_key, self.secret_key, self.region)
def fetch_ecs_instances(self):
"""获取所有ECS实例信息"""
request = DescribeInstancesRequest()
request.set_accept_format('json')
request.set_PageSize(10)
page_number = 1
instances = []
while True:
request.set_PageNumber(page_number)
response = self.client.do_action_with_exception(request)
res = json.loads(str(response, encoding='utf-8'))
if not res.get('Instances', {}).get('Instance'):
break
for instance in res['Instances']['Instance']:
instances.append(instance)
page_number += 1
return instances
def get_disk_capacity(self, instance_id):
"""获取系统盘容量"""
request = DescribeDisksRequest()
request.set_accept_format('json')
request.set_InstanceId(instance_id)
response = self.client.do_action_with_exception(request)
disks = json.loads(str(response, encoding='utf-8'))['Disks']['Disk']
for disk in disks:
if disk['Device'] == "/dev/xvda":
return str(disk['Size']) + 'GB'
return '0GB'
2. JumpServer接口模块
# -*- coding: utf-8 -*-
import requests
import json
from jms_db import get_asset_id
from aliyun_ecs import EcsService
class JumpServerClient:
def __init__(self):
self.base_url = 'http://192.168.1.136'
self.auth_token = '012cddf58f6f89f32631c9a7d49e8991b34a8c71'
self.headers = {
"Authorization": f'Token {self.auth_token}',
'X-JMS-ORG': '00000000-0000-0000-0000-000000000002',
'accept': 'application/json'
}
self.assets_endpoint = f'{self.base_url}/api/v1/assets/assets/'
self.ecs_service = EcsService()
def fetch_assets(self):
"""获取所有资产信息"""
req = requests.get(self.assets_endpoint, headers=self.headers)
return json.loads(req.content.decode())
def compare_assets(self, assets_list, ecs_list):
"""资产对比分析"""
jms_ip_set = set(asset['ip'] for asset in assets_list)
ecs_ip_set = set(ecs['NetworkInterfaces']['NetworkInterface'][0]['PrimaryIpAddress'] for ecs in ecs_list)
in_jms = ecs_ip_set & jms_ip_set
not_in_jms = ecs_ip_set - jms_ip_set
not_in_ecs = jms_ip_set - ecs_ip_set
return in_jms, not_in_jms, not_in_ecs
def create_assets(self, ecs_list, ip_list):
"""创建缺失资产"""
template = {
'id': '', 'hostname': '', 'ip': '', 'platform': 'Linux',
'protocols': ['ssh/34518'], 'is_active': True,
'public_ip': None, 'number': None, 'comment': '',
'vendor': None, 'model': None, 'sn': None,
'cpu_model': None, 'cpu_count': None, 'cpu_cores': None,
'cpu_vcpus': None, 'memory': None, 'disk_total': None,
'disk_info': None, 'os': None, 'os_version': None,
'os_arch': None, 'hostname_raw': None, 'cpu_info': '',
'hardware_info': '', 'domain': None, 'admin_user': None,
'admin_user_display': '', 'nodes': ['b0506981-d3e0-4e28-ba83-968c2521df6b'],
'nodes_display': ['/Default'], 'labels': [], 'labels_display': [],
'connectivity': 'failed', 'date_verified': '', 'created_by': '',
'date_created': '', 'org_id': '00000000-0000-0000-0000-000000000002',
'org_name': 'Default'
}
new_assets = []
for ecs in ecs_list:
ip = ecs['NetworkInterfaces']['NetworkInterface'][0]['PrimaryIpAddress']
if ip in ip_list:
template['hostname'] = f"{ecs['InstanceName']}-{ip}"
template['ip'] = ip
template['memory'] = f"{ecs['Memory']/1024}GB"
template['cpu_vcpus'] = ecs['Cpu']
template['platform'] = ecs['OSType'].capitalize()
template['os'] = ecs['OSName']
template['cpu_count'] = ecs['CpuOptions']['ThreadsPerCore']
template['cpu_cores'] = ecs['CpuOptions']['CoreCount']
template['disk_total'] = self.ecs_service.get_disk_capacity(ecs['InstanceId'])
response = requests.post(self.assets_endpoint, headers=self.headers, data=template)
if response.status_code != 201:
print(f"资产创建失败: {template}")
else:
new_assets.append(ip)
if new_assets:
print(f"新增资产数量: {len(new_assets)}")
print(f"新增IP列表: {new_assets}")
def update_assets(self, ecs_list, assets_list, exist_ips):
"""更新资产信息"""
updated = []
for ip in exist_ips:
ecs_info = next((e for e in ecs_list if e['NetworkInterfaces']['NetworkInterface'][0]['PrimaryIpAddress'] == ip), None)
asset_info = next((a for a in assets_list if a['ip'] == ip), None)
if not ecs_info or not asset_info:
continue
if asset_info['hostname'] != f"{ecs_info['InstanceName']}-{ip}":
asset_info['hostname'] = f"{ecs_info['InstanceName']}-{ip}"
updated.append(asset_info)
if asset_info['memory'] != f"{ecs_info['Memory']/1024}GB":
asset_info['memory'] = f"{ecs_info['Memory']/1024}GB"
updated.append(asset_info)
if asset_info['cpu_vcpus'] != ecs_info['Cpu']:
asset_info['cpu_vcpus'] = ecs_info['Cpu']
updated.append(asset_info)
if asset_info['disk_total'] != self.ecs_service.get_disk_capacity(ecs_info['InstanceId']):
asset_info['disk_total'] = self.ecs_service.get_disk_capacity(ecs_info['InstanceId'])
updated.append(asset_info)
if updated:
response = requests.put(self.assets_endpoint, headers=self.headers, json=updated)
if response.status_code != 200:
print("资产更新失败")
else:
print(f"更新资产数量: {len(updated)}")
print(f"更新IP列表: {','.join([a['ip'] for a in updated])}")
def delete_assets(self, ip_list):
"""删除无效资产"""
deleted = []
for ip in ip_list:
asset_id = get_asset_id(ip)
url = f"{self.assets_endpoint}{asset_id}/"
response = requests.delete(url, headers=self.headers)
if response.status_code == 204:
deleted.append(ip)
else:
print(f"删除失败: {ip}")
if deleted:
print(f"已删除资产数量: {len(deleted)}")
print(f"删除IP列表: {deleted}")
3. 数据库操作模块
# -*- coding: utf-8 -*-
import pymysql
def get_asset_id(ip):
"""查询资产ID"""
conn = pymysql.connect(
host='192.168.1.135',
port=3306,
user='root',
password='Qwer@123',
database='jumpserver'
)
cursor = conn.cursor()
sql = "SELECT id FROM assets_asset WHERE ip = %s"
cursor.execute(sql, (ip,))
result = cursor.fetchone()[0]
cursor.close()
conn.close()
return result
4. 主程序逻辑
# -*- coding: utf-8 -*-
from aliyun_ecs import EcsService
from jumpserver import JumpServerClient
if __name__ == '__main__':
cloud_service = EcsService()
jumpserver_client = JumpServerClient()
# 获取基础数据
ecs_instances = cloud_service.fetch_ecs_instances()
existing_assets = jumpserver_client.fetch_assets()
# 资产对比分析
exist_ips, missing_ips, orphaned_ips = jumpserver_client.compare_assets(existing_assets, ecs_instances)
# 执行同步操作
if missing_ips:
jumpserver_client.create_assets(ecs_instances, missing_ips)
if exist_ips:
jumpserver_client.update_assets(ecs_instances, existing_assets, exist_ips)
if orphaned_ips:
jumpserver_client.delete_assets(orphaned_ips)