Forum Discussion

mikenelson-pure's avatar
mikenelson-pure
Community Manager
2 months ago

Some Fleet Python code using the response module

This is the Python sister script to the posted PowerShell version.
This script is a Python script that uses the response module to do the tasks. It does not use the Pure Storage pyClient. This is for folks who do raw API calls using automation packages, runbooks, and scripts. It is not intended to use in it's entirety, but rather to be used as code snippets and starters for your own scripts. The full script is available in this GitHub repository.
This script will:

  • Use native Python calls to the FlashArray API
  • Authenticates an API Token user and gets the x-auth-token for requests
  • Query a fleet and determine the fleet members
  • Query fleet Presets & Workloads
  • List fleet volumes and hosts (top X, configurable)
  • Create a host, volume, and then connect the volume to the host on a member array.
#!/usr/bin/env python3
import json
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Set target array address
target = "10.0.0.10"

# API version to use
# TO-DO: dynamically get the latest version
latest_api_version = "2.45"

# Authenticate and get x-auth-token
session = requests.Session()
login_url = f'https://{target}/api/{latest_api_version}/login'
session.headers.update({
    "api-token": "<your_api_token_here>"
})

response = session.post(login_url, verify=False)
x_auth_token = response.headers.get("x-auth-token")
if x_auth_token:
    session.headers.update({"x-auth-token": x_auth_token})
else:
    print("Error: x-auth-token not found in response headers.")
print(x_auth_token)

Query the fleet:

# Get fleet info
fleets_url = f'https://{target}/api/{latest_api_version}/fleets'
fleets_response = session.get(fleets_url, verify=False)
fleets_json = fleets_response.json()
fleet_name = fleets_json['items'][0]['name']
print (f"Selected fleet: {fleet_name}")

# Get fleet members
members_url = f"https://{target}/api/{latest_api_version}/fleets/members?fleet_name={fleet_name}"
fleets_response_members = session.get(members_url, verify=False)
VAR1 = fleets_response_members.json()
VAR_RESULTS = [item['member']['name'] for item in VAR1['items']]
print(f"Fleet members: {VAR_RESULTS}")

Enumerate volumes and hosts with pagination:

# Enumerate all volumes in the fleet with pagination
print("\nEnumerating volumes in the fleet:")
limit = 10 # Adjust as needed
continuation_token = None
all_volumes = []
volumes_base_url = f"https://{target}/api/{latest_api_version}/volumes?context_names={','.join(VAR_RESULTS)}"

while True:
    params = {'limit': limit}
    if continuation_token:
        params['continuation_token'] = continuation_token
    volumes_response = session.get(volumes_base_url, params=params, verify=False)
    volumes_json = volumes_response.json()
    all_volumes.extend(volumes_json.get('items', []))
    continuation_token = volumes_response.headers.get('x-next-token')
    if not continuation_token:
        break

print(f"Total volumes found: {len(all_volumes)}")
print(json.dumps(all_volumes, indent=2))

# Enumerate hosts in the fleet with pagination
print("\nEnumerating hosts in the fleet:")
limit = 10 # Adjust as needed
continuation_token = None
all_hosts = []
hosts_base_url = f"https://{target}/api/{latest_api_version}/hosts?context_names={','.join(VAR_RESULTS)}"

while True:
    params = {'limit': limit}
    if continuation_token:
        params['continuation_token'] = continuation_token
    hosts_response = session.get(hosts_base_url, params=params, verify=False)
    hosts_json = hosts_response.json()
    all_hosts.extend(hosts_json.get('items', []))
    continuation_token = hosts_response.headers.get('x-next-token')
    if not continuation_token:
        break

print(f"Total hosts found: {len(all_hosts)}")
print(json.dumps(all_hosts, indent=2))

Enumerate Presets and Workloads in the fleet:

# Enumerate all Presets in the fleet
print("\nEnumerating presets in the fleet:")
presets_url = f"https://{target}/api/{latest_api_version}/presets?context_names={','.join(VAR_RESULTS)}"
presets_response = session.get(presets_url, verify=False)
presets_json = presets_response.json()
print(f"Total presets found: {len(presets_json.get('items', []))}")

# Enumerate all Woorkloads in the fleet
print("\nEnumerating workloads in the fleet:")
workloads_url = f"https://{target}/api/{latest_api_version}/workloads?context_names={','.join(VAR_RESULTS)}"
workloads_response = session.get(workloads_url, verify=False)
workloads_json = workloads_response.json()
print(f"Total workloads found: {len(workloads_json.get('items', []))}")

Create a host on a member array, create a volume, and connect the volume to the host:

# Select a member array (not the target)
member_array = next((name for name in VAR_RESULTS if name != target), None)
if not member_array:
    print("No other member array found in the fleet.")
    exit(1)
print(f"Selected member array for operations: {member_array}")

# Create a new host on the member array
host_name = "demo-host-01"
host_iqn = "iqn.2025-08.com.fleetdemo:host01"
host_payload = {
    "names": host_name,
    "iqn": [host_iqn],
    "context": {
        "name": member_array
    }
}
hosts_url = f"https://{target}/api/{latest_api_version}/hosts"
host_resp = session.post(hosts_url, json=host_payload, verify=False)
print(f"Host creation response: {host_resp.json()}")

# Create a new volume on the member array
volume_name = "APIDemo-vol1"
volume_payload = {
    "names": volume_name,
    "provisioned": 10737418240,  # 10 GB in bytes
    "context": {
        "name": member_array
    }
}
volumes_url = f"https://{target}/api/{latest_api_version}/volumes"
volume_resp = session.post(volumes_url, json=volume_payload, verify=False)
print(f"Volume creation response: {volume_resp.json()}")

# Connect the volume to the host
connect_payload = {
    "volume_names": volume_name,
    "context_names":member_array,
    "host_names": host_name,
}
connections_url = f"https://{target}/api/{latest_api_version}/connections"
connect_resp = session.post(connections_url, json=connect_payload, verify=False)
print(f"Connection response: {connect_resp.json()}")

 

No RepliesBe the first to reply