Automation Playbooks
Use these production-ready patterns to automate scans, Kubernetes/container checks, configuration, reports, schedules, webhooks, and MCP tool flows with the current API v1 contract.
These playbooks assume HTTPS local API with self-signed TLS and bearer-token authentication.
Docs > API & Automation > API Playbooks
Use this page after the API Reference when you need runnable automation patterns and operational runbooks.
Preflight: Runtime and Auth Baseline
Set base URL and token once, then verify runtime status and auth gate.
export CWS_BASE_URL="https://192.168.1.4:9098"
export CWS_TOKEN="YOUR_API_TOKEN"
# Public route
curl -k "$CWS_BASE_URL/status"
# Authenticated route
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/meta/capabilities"
import json
import ssl
import urllib.request
BASE_URL = "https://192.168.1.4:9098"
TOKEN = "YOUR_API_TOKEN"
TLS_CONTEXT = ssl._create_unverified_context()
def request_json(path, method="GET", payload=None, auth=False):
headers = {}
body = None
if auth:
headers["Authorization"] = f"Bearer {TOKEN}"
if payload is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(f"{BASE_URL}{path}", data=body, headers=headers, method=method)
with urllib.request.urlopen(req, context=TLS_CONTEXT, timeout=20) as resp:
text = resp.read().decode("utf-8")
print(path, resp.status, text)
return json.loads(text) if text else {}
request_json("/status")
request_json("/v1/meta/capabilities", auth=True)
(async () => {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
const BASE_URL = "https://192.168.1.4:9098";
const TOKEN = "YOUR_API_TOKEN";
async function requestJson(path, method = "GET", payload = null, auth = false) {
const headers = {};
if (auth) headers.Authorization = `Bearer ${TOKEN}`;
if (payload !== null) headers["Content-Type"] = "application/json";
const res = await fetch(`${BASE_URL}${path}`, {
method,
headers,
body: payload !== null ? JSON.stringify(payload) : undefined
});
const text = await res.text();
console.log(path, res.status, text);
return text ? JSON.parse(text) : {};
}
await requestJson("/status");
await requestJson("/v1/meta/capabilities", "GET", null, true);
})();
Playbook 1: Normalize Notification Policy State
Set a valid notification trigger mode before automation patches. Accepted values are scan_complete and waste_only.
# Read current policy
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/notifications/policy"
# Normalize to a valid mode
curl -k -X PATCH "$CWS_BASE_URL/v1/notifications/policy" \
-H "Authorization: Bearer $CWS_TOKEN" \
-H "Content-Type: application/json" \
-d '{"notification_trigger_mode":"scan_complete"}'
import json
import ssl
import urllib.request
BASE_URL = "https://192.168.1.4:9098"
TOKEN = "YOUR_API_TOKEN"
TLS_CONTEXT = ssl._create_unverified_context()
def api(path, method="GET", payload=None):
headers = {"Authorization": f"Bearer {TOKEN}"}
body = None
if payload is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(f"{BASE_URL}{path}", data=body, headers=headers, method=method)
with urllib.request.urlopen(req, context=TLS_CONTEXT, timeout=20) as resp:
text = resp.read().decode("utf-8")
print(resp.status, text)
return json.loads(text) if text else {}
api("/v1/notifications/policy")
api("/v1/notifications/policy", "PATCH", {"notification_trigger_mode": "scan_complete"})
(async () => {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
const BASE_URL = "https://192.168.1.4:9098";
const TOKEN = "YOUR_API_TOKEN";
async function api(path, method = "GET", payload = null) {
const res = await fetch(`${BASE_URL}${path}`, {
method,
headers: {
Authorization: `Bearer ${TOKEN}`,
...(payload ? { "Content-Type": "application/json" } : {})
},
body: payload ? JSON.stringify(payload) : undefined
});
const text = await res.text();
console.log(res.status, text);
return text ? JSON.parse(text) : {};
}
await api("/v1/notifications/policy");
await api("/v1/notifications/policy", "PATCH", { notification_trigger_mode: "scan_complete" });
})();
Playbook 2: Account Selection and Connectivity Test
List account IDs for targeting and run credential tests before starting scan jobs.
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/accounts"
curl -k -X POST "$CWS_BASE_URL/v1/cloud-accounts/test" -H "Authorization: Bearer $CWS_TOKEN" -H "Content-Type: application/json" -d '{
"provider": "unsupported-provider",
"credentials": "{\"foo\":\"bar\"}"
}'
import json
import ssl
import urllib.request
BASE_URL = "https://192.168.1.4:9098"
TOKEN = "YOUR_API_TOKEN"
TLS_CONTEXT = ssl._create_unverified_context()
def api(path, method="GET", payload=None):
headers = {"Authorization": f"Bearer {TOKEN}"}
body = None
if payload is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(f"{BASE_URL}{path}", data=body, headers=headers, method=method)
with urllib.request.urlopen(req, context=TLS_CONTEXT, timeout=20) as resp:
text = resp.read().decode("utf-8")
print(resp.status, text)
return json.loads(text) if text else {}
accounts = api("/v1/accounts")
print("accounts:", accounts)
test_result = api(
"/v1/cloud-accounts/test",
"POST",
{"provider": "unsupported-provider", "credentials": "{\"foo\":\"bar\"}"}
)
print("test_result:", test_result)
(async () => {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
const BASE_URL = "https://192.168.1.4:9098";
const TOKEN = "YOUR_API_TOKEN";
async function api(path, method = "GET", payload = null) {
const res = await fetch(`${BASE_URL}${path}`, {
method,
headers: {
Authorization: `Bearer ${TOKEN}`,
...(payload ? { "Content-Type": "application/json" } : {})
},
body: payload ? JSON.stringify(payload) : undefined
});
const text = await res.text();
console.log(res.status, text);
return text ? JSON.parse(text) : {};
}
const accounts = await api("/v1/accounts");
console.log("accounts:", accounts);
const testResult = await api("/v1/cloud-accounts/test", "POST", {
provider: "unsupported-provider",
credentials: "{\"foo\":\"bar\"}"
});
console.log("testResult:", testResult);
})();
Note: unsupported-provider currently validates format only and can return ok=true with an implementation message.
Playbook 3: Trigger Scan and Poll to Completion
Create async scan job, poll status, and stop when terminal state is reached.
Full scan: call POST /v1/scans with no body (or {}). The examples below show a targeted run with selected_accounts.
curl -k -X POST "$CWS_BASE_URL/v1/scans" -H "Authorization: Bearer $CWS_TOKEN"scan_id=$(curl -k -X POST "$CWS_BASE_URL/v1/scans" \
-H "Authorization: Bearer $CWS_TOKEN" \
-H "Content-Type: application/json" \
-d '{"selected_accounts":["profile_abc123"],"report_emails":["service@cloud-waste-scanner.com"]}' \
| python3 -c 'import sys,json; print(json.load(sys.stdin)["scan_id"])')
echo "scan_id=$scan_id"
while true; do
status=$(curl -k -H "Authorization: Bearer $CWS_TOKEN" \
"$CWS_BASE_URL/v1/scans/$scan_id" \
| python3 -c 'import sys,json; print(json.load(sys.stdin).get("status",""))')
echo "status=$status"
if [ "$status" = "completed" ] || [ "$status" = "failed" ] || [ "$status" = "canceled" ]; then
break
fi
sleep 2
done
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/scans/$scan_id"
import json
import ssl
import time
import urllib.request
BASE_URL = "https://192.168.1.4:9098"
TOKEN = "YOUR_API_TOKEN"
TLS_CONTEXT = ssl._create_unverified_context()
def api(path, method="GET", payload=None):
headers = {"Authorization": f"Bearer {TOKEN}"}
body = None
if payload is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(f"{BASE_URL}{path}", data=body, headers=headers, method=method)
with urllib.request.urlopen(req, context=TLS_CONTEXT, timeout=30) as resp:
text = resp.read().decode("utf-8")
return json.loads(text) if text else {}
created = api("/v1/scans", "POST", {
"selected_accounts": ["profile_abc123"],
"report_emails": ["service@cloud-waste-scanner.com"]
})
scan_id = created["scan_id"]
print("scan_id =", scan_id)
while True:
detail = api(f"/v1/scans/{scan_id}")
status = detail.get("status", "")
print("status =", status)
if status in {"completed", "failed", "canceled"}:
break
time.sleep(2)
print(json.dumps(detail, indent=2))
(async () => {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
const BASE_URL = "https://192.168.1.4:9098";
const TOKEN = "YOUR_API_TOKEN";
async function api(path, method = "GET", payload = null) {
const res = await fetch(`${BASE_URL}${path}`, {
method,
headers: {
Authorization: `Bearer ${TOKEN}`,
...(payload ? { "Content-Type": "application/json" } : {})
},
body: payload ? JSON.stringify(payload) : undefined
});
const text = await res.text();
return text ? JSON.parse(text) : {};
}
const created = await api("/v1/scans", "POST", {
selected_accounts: ["profile_abc123"],
report_emails: ["service@cloud-waste-scanner.com"]
});
const scanId = created.scan_id;
console.log("scan_id =", scanId);
let detail = {};
while (true) {
detail = await api(`/v1/scans/${scanId}`);
const status = detail.status || "";
console.log("status =", status);
if (["completed", "failed", "canceled"].includes(status)) break;
await new Promise((resolve) => setTimeout(resolve, 2000));
}
console.log(JSON.stringify(detail, null, 2));
})();
Playbook 4: Generate and Download Report Artifacts
Generate report artifacts through API and download by report_id.
report_id=$(curl -k -X POST "$CWS_BASE_URL/v1/reports/generate" \
-H "Authorization: Bearer $CWS_TOKEN" \
-H "Content-Type: application/json" \
-d '{"format":"pdf","include_esg":true}' \
| python3 -c 'import sys,json; print(json.load(sys.stdin)["report_id"])')
echo "report_id=$report_id"
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/reports/$report_id"
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/reports/$report_id/download" -o "report_${report_id}.pdf"
import json
import ssl
import urllib.request
BASE_URL = "https://192.168.1.4:9098"
TOKEN = "YOUR_API_TOKEN"
TLS_CONTEXT = ssl._create_unverified_context()
def api(path, method="GET", payload=None, raw=False):
headers = {"Authorization": f"Bearer {TOKEN}"}
body = None
if payload is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(f"{BASE_URL}{path}", data=body, headers=headers, method=method)
with urllib.request.urlopen(req, context=TLS_CONTEXT, timeout=30) as resp:
data = resp.read()
if raw:
return data
return json.loads(data.decode("utf-8")) if data else {}
generated = api("/v1/reports/generate", "POST", {"format": "pdf", "include_esg": True})
report_id = generated["report_id"]
print("report_id =", report_id)
metadata = api(f"/v1/reports/{report_id}")
print(json.dumps(metadata, indent=2))
pdf_bytes = api(f"/v1/reports/{report_id}/download", raw=True)
out_file = f"report_{report_id}.pdf"
with open(out_file, "wb") as f:
f.write(pdf_bytes)
print("saved:", out_file)
(async () => {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
const fs = require("node:fs");
const BASE_URL = "https://192.168.1.4:9098";
const TOKEN = "YOUR_API_TOKEN";
async function api(path, method = "GET", payload = null, raw = false) {
const res = await fetch(`${BASE_URL}${path}`, {
method,
headers: {
Authorization: `Bearer ${TOKEN}`,
...(payload ? { "Content-Type": "application/json" } : {})
},
body: payload ? JSON.stringify(payload) : undefined
});
if (raw) return Buffer.from(await res.arrayBuffer());
const text = await res.text();
return text ? JSON.parse(text) : {};
}
const generated = await api("/v1/reports/generate", "POST", { format: "pdf", include_esg: true });
const reportId = generated.report_id;
console.log("report_id =", reportId);
const metadata = await api(`/v1/reports/${reportId}`);
console.log(JSON.stringify(metadata, null, 2));
const pdfBytes = await api(`/v1/reports/${reportId}/download`, "GET", null, true);
const outFile = `report_${reportId}.pdf`;
fs.writeFileSync(outFile, pdfBytes);
console.log("saved:", outFile);
})();
Playbook 5: Recurring Schedule plus Run-Now
Create persistent schedule payload once and trigger immediate runs when needed.
schedule_id=$(curl -k -X POST "$CWS_BASE_URL/v1/schedules" -H "Authorization: Bearer $CWS_TOKEN" -H "Content-Type: application/json" -d '{
"name":"nightly-cost-scan",
"enabled":true,
"run_at":1762473600,
"interval_minutes":1440,
"timezone":"UTC",
"scan":{"selected_accounts":["profile_abc123"],"report_emails":["service@cloud-waste-scanner.com"]}
}' | python3 -c 'import sys,json; print(json.load(sys.stdin)["id"])')
echo "schedule_id=$schedule_id"
curl -k -X POST "$CWS_BASE_URL/v1/schedules/$schedule_id/run-now" -H "Authorization: Bearer $CWS_TOKEN"
import json
import ssl
import urllib.request
BASE_URL = "https://192.168.1.4:9098"
TOKEN = "YOUR_API_TOKEN"
TLS_CONTEXT = ssl._create_unverified_context()
def api(path, method="GET", payload=None):
headers = {"Authorization": f"Bearer {TOKEN}"}
body = None
if payload is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(f"{BASE_URL}{path}", data=body, headers=headers, method=method)
with urllib.request.urlopen(req, context=TLS_CONTEXT, timeout=30) as resp:
text = resp.read().decode("utf-8")
return json.loads(text) if text else {}
created = api("/v1/schedules", "POST", {
"name": "nightly-cost-scan",
"enabled": True,
"run_at": 1762473600,
"interval_minutes": 1440,
"timezone": "UTC",
"scan": {
"selected_accounts": ["profile_abc123"],
"report_emails": ["service@cloud-waste-scanner.com"]
}
})
schedule_id = created["id"]
print("schedule_id =", schedule_id)
run_now = api(f"/v1/schedules/{schedule_id}/run-now", "POST")
print(json.dumps(run_now, indent=2))
(async () => {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
const BASE_URL = "https://192.168.1.4:9098";
const TOKEN = "YOUR_API_TOKEN";
async function api(path, method = "GET", payload = null) {
const res = await fetch(`${BASE_URL}${path}`, {
method,
headers: {
Authorization: `Bearer ${TOKEN}`,
...(payload ? { "Content-Type": "application/json" } : {})
},
body: payload ? JSON.stringify(payload) : undefined
});
const text = await res.text();
return text ? JSON.parse(text) : {};
}
const created = await api("/v1/schedules", "POST", {
name: "nightly-cost-scan",
enabled: true,
run_at: 1762473600,
interval_minutes: 1440,
timezone: "UTC",
scan: {
selected_accounts: ["profile_abc123"],
report_emails: ["service@cloud-waste-scanner.com"]
}
});
const scheduleId = created.id;
console.log("schedule_id =", scheduleId);
const runNow = await api(`/v1/schedules/${scheduleId}/run-now`, "POST");
console.log(JSON.stringify(runNow, null, 2));
})();
Playbook 6: Webhook and MCP Integration Smoke Test
Validate event push path and MCP scan tool path in one flow.
# Create webhook (replace URL with your receiver)
curl -k -X POST "$CWS_BASE_URL/v1/webhooks" \
-H "Authorization: Bearer $CWS_TOKEN" \
-H "Content-Type: application/json" \
-d '{"url":"https://example.com/cws-webhook","events":["scan.completed"],"is_active":true,"secret":"shared-secret"}'
# MCP run scan
mcp_scan_id=$(curl -k -X POST "$CWS_BASE_URL/v1/mcp/tools/run-scan" \
-H "Authorization: Bearer $CWS_TOKEN" \
-H "Content-Type: application/json" \
-d '{"demo_mode":true}' \
| python3 -c 'import sys,json; print(json.load(sys.stdin)["result"]["scan_id"])')
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/mcp/tools/get-scan/$mcp_scan_id"
import json
import ssl
import urllib.request
BASE_URL = "https://192.168.1.4:9098"
TOKEN = "YOUR_API_TOKEN"
TLS_CONTEXT = ssl._create_unverified_context()
def api(path, method="GET", payload=None):
headers = {"Authorization": f"Bearer {TOKEN}"}
body = None
if payload is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(f"{BASE_URL}{path}", data=body, headers=headers, method=method)
with urllib.request.urlopen(req, context=TLS_CONTEXT, timeout=30) as resp:
text = resp.read().decode("utf-8")
return json.loads(text) if text else {}
webhook = api("/v1/webhooks", "POST", {
"url": "https://example.com/cws-webhook",
"events": ["scan.completed"],
"is_active": True,
"secret": "shared-secret"
})
print("webhook:", json.dumps(webhook, indent=2))
mcp_run = api("/v1/mcp/tools/run-scan", "POST", {"demo_mode": True})
mcp_scan_id = mcp_run["result"]["scan_id"]
print("mcp_scan_id =", mcp_scan_id)
mcp_scan = api(f"/v1/mcp/tools/get-scan/{mcp_scan_id}")
print("mcp_scan:", json.dumps(mcp_scan, indent=2))
(async () => {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
const BASE_URL = "https://192.168.1.4:9098";
const TOKEN = "YOUR_API_TOKEN";
async function api(path, method = "GET", payload = null) {
const res = await fetch(`${BASE_URL}${path}`, {
method,
headers: {
Authorization: `Bearer ${TOKEN}`,
...(payload ? { "Content-Type": "application/json" } : {})
},
body: payload ? JSON.stringify(payload) : undefined
});
const text = await res.text();
return text ? JSON.parse(text) : {};
}
const webhook = await api("/v1/webhooks", "POST", {
url: "https://example.com/cws-webhook",
events: ["scan.completed"],
is_active: true,
secret: "shared-secret"
});
console.log("webhook:", JSON.stringify(webhook, null, 2));
const mcpRun = await api("/v1/mcp/tools/run-scan", "POST", { demo_mode: true });
const mcpScanId = mcpRun.result.scan_id;
console.log("mcp_scan_id =", mcpScanId);
const mcpScan = await api(`/v1/mcp/tools/get-scan/${mcpScanId}`);
console.log("mcp_scan:", JSON.stringify(mcpScan, null, 2));
})();
Playbook 7: Create, Update, and Delete a Cloud Account
Run full account lifecycle through API. This endpoint family is for non-AWS providers; AWS is managed through local AWS profile flow by design.
# Create
account_id=$(curl -k -X POST "$CWS_BASE_URL/v1/cloud-accounts" -H "Authorization: Bearer $CWS_TOKEN" -H "Content-Type: application/json" -d '{
"provider":"alibaba",
"name":"prod-cn",
"credentials":"{\"access_key_id\":\"ALI_ACCESS_KEY_ID\",\"access_key_secret\":\"ALI_ACCESS_KEY_SECRET\"}",
"timeout_seconds":12
}' \
| python3 -c 'import sys,json; print(json.load(sys.stdin)["id"])')
echo "account_id=$account_id"
# Read
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/cloud-accounts/$account_id"
# Update
curl -k -X PATCH "$CWS_BASE_URL/v1/cloud-accounts/$account_id" -H "Authorization: Bearer $CWS_TOKEN" -H "Content-Type: application/json" -d '{
"name":"prod-cn-finops",
"timeout_seconds":20
}'
# Delete
curl -k -X DELETE "$CWS_BASE_URL/v1/cloud-accounts/$account_id" -H "Authorization: Bearer $CWS_TOKEN"
import json
import ssl
import urllib.request
BASE_URL = "https://192.168.1.4:9098"
TOKEN = "YOUR_API_TOKEN"
TLS_CONTEXT = ssl._create_unverified_context()
def api(path, method="GET", payload=None):
headers = {"Authorization": f"Bearer {TOKEN}"}
body = None
if payload is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(f"{BASE_URL}{path}", data=body, headers=headers, method=method)
with urllib.request.urlopen(req, context=TLS_CONTEXT, timeout=30) as resp:
text = resp.read().decode("utf-8")
return json.loads(text) if text else {}
created = api("/v1/cloud-accounts", "POST", {
"provider": "alibaba",
"name": "prod-cn",
"credentials": json.dumps({
"access_key_id": "ALI_ACCESS_KEY_ID",
"access_key_secret": "ALI_ACCESS_KEY_SECRET"
}),
"timeout_seconds": 12
})
account_id = created["id"]
print("account_id =", account_id)
detail = api(f"/v1/cloud-accounts/{account_id}")
print("detail:", json.dumps(detail, indent=2))
updated = api(f"/v1/cloud-accounts/{account_id}", "PATCH", {
"name": "prod-cn-finops",
"timeout_seconds": 20
})
print("updated:", json.dumps(updated, indent=2))
deleted = api(f"/v1/cloud-accounts/{account_id}", "DELETE")
print("deleted:", json.dumps(deleted, indent=2))
(async () => {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
const BASE_URL = "https://192.168.1.4:9098";
const TOKEN = "YOUR_API_TOKEN";
async function api(path, method = "GET", payload = null) {
const res = await fetch(`${BASE_URL}${path}`, {
method,
headers: {
Authorization: `Bearer ${TOKEN}`,
...(payload ? { "Content-Type": "application/json" } : {})
},
body: payload ? JSON.stringify(payload) : undefined
});
const text = await res.text();
return text ? JSON.parse(text) : {};
}
const created = await api("/v1/cloud-accounts", "POST", {
provider: "alibaba",
name: "prod-cn",
credentials: JSON.stringify({
access_key_id: "ALI_ACCESS_KEY_ID",
access_key_secret: "ALI_ACCESS_KEY_SECRET"
}),
timeout_seconds: 12
});
const accountId = created.id;
console.log("account_id =", accountId);
const detail = await api(`/v1/cloud-accounts/${accountId}`);
console.log("detail:", JSON.stringify(detail, null, 2));
const updated = await api(`/v1/cloud-accounts/${accountId}`, "PATCH", {
name: "prod-cn-finops",
timeout_seconds: 20
});
console.log("updated:", JSON.stringify(updated, null, 2));
const deleted = await api(`/v1/cloud-accounts/${accountId}`, "DELETE");
console.log("deleted:", JSON.stringify(deleted, null, 2));
})();
Playbook 8: Import Cloud Accounts from JSON and Run Scan
Batch-import account definitions from JSON, capture returned account IDs, and run a scan with only the imported scope.
# 1) Prepare import file
cat > accounts.json <<'JSON'
[
{
"provider": "alibaba",
"name": "import-cn-prod",
"credentials": {
"access_key_id": "ALI_ACCESS_KEY_ID",
"access_key_secret": "ALI_ACCESS_KEY_SECRET"
},
"timeout_seconds": 12
},
{
"provider": "azure",
"name": "import-eu-prod",
"credentials": {
"tenant_id": "AZ_TENANT_ID",
"client_id": "AZ_CLIENT_ID",
"client_secret": "AZ_CLIENT_SECRET",
"subscription_id": "AZ_SUBSCRIPTION_ID"
}
}
]
JSON
# 2) Import each item via /v1/cloud-accounts and collect IDs
> imported_ids.txt
jq -c '.[]' accounts.json | while IFS= read -r item; do
payload=$(printf '%s' "$item" | jq -c '
{
provider,
name,
credentials: (.credentials | tostring),
timeout_seconds,
policy_custom,
proxy_profile_id
} | with_entries(select(.value != null))
')
account_id=$(curl -k -X POST "$CWS_BASE_URL/v1/cloud-accounts" \
-H "Authorization: Bearer $CWS_TOKEN" \
-H "Content-Type: application/json" \
-d "$payload" \
| python3 -c 'import sys,json; print(json.load(sys.stdin)["id"])')
echo "$account_id" >> imported_ids.txt
done
# 3) Convert imported IDs to selected_accounts JSON array
selected_accounts=$(python3 - <<'PY'
import json
with open("imported_ids.txt", "r", encoding="utf-8") as f:
ids = [line.strip() for line in f if line.strip()]
print(json.dumps(ids))
PY
)
# 4) Start scan for imported accounts only
scan_id=$(curl -k -X POST "$CWS_BASE_URL/v1/scans" \
-H "Authorization: Bearer $CWS_TOKEN" \
-H "Content-Type: application/json" \
-d "{\"selected_accounts\":$selected_accounts}" \
| python3 -c 'import sys,json; print(json.load(sys.stdin)["scan_id"])')
echo "scan_id=$scan_id"
# 5) Poll final state
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/scans/$scan_id"
import json
import ssl
import time
import urllib.request
BASE_URL = "https://192.168.1.4:9098"
TOKEN = "YOUR_API_TOKEN"
TLS_CONTEXT = ssl._create_unverified_context()
def api(path, method="GET", payload=None):
headers = {"Authorization": f"Bearer {TOKEN}"}
body = None
if payload is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(f"{BASE_URL}{path}", data=body, headers=headers, method=method)
with urllib.request.urlopen(req, context=TLS_CONTEXT, timeout=30) as resp:
text = resp.read().decode("utf-8")
return json.loads(text) if text else {}
with open("accounts.json", "r", encoding="utf-8") as f:
accounts = json.load(f)
imported_ids = []
for item in accounts:
payload = {
"provider": item["provider"],
"name": item["name"],
"credentials": json.dumps(item["credentials"])
}
if "timeout_seconds" in item:
payload["timeout_seconds"] = item["timeout_seconds"]
if "policy_custom" in item:
payload["policy_custom"] = item["policy_custom"]
if "proxy_profile_id" in item:
payload["proxy_profile_id"] = item["proxy_profile_id"]
created = api("/v1/cloud-accounts", "POST", payload)
imported_ids.append(created["id"])
print("imported_ids:", imported_ids)
scan = api("/v1/scans", "POST", {"selected_accounts": imported_ids})
scan_id = scan["scan_id"]
print("scan_id =", scan_id)
while True:
detail = api(f"/v1/scans/{scan_id}")
status = detail.get("status", "")
print("status =", status)
if status in {"completed", "failed", "canceled"}:
break
time.sleep(2)
print(json.dumps(detail, indent=2))
(async () => {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
const fs = require("node:fs");
const BASE_URL = "https://192.168.1.4:9098";
const TOKEN = "YOUR_API_TOKEN";
async function api(path, method = "GET", payload = null) {
const res = await fetch(`${BASE_URL}${path}`, {
method,
headers: {
Authorization: `Bearer ${TOKEN}`,
...(payload ? { "Content-Type": "application/json" } : {})
},
body: payload ? JSON.stringify(payload) : undefined
});
const text = await res.text();
return text ? JSON.parse(text) : {};
}
const accounts = JSON.parse(fs.readFileSync("accounts.json", "utf-8"));
const importedIds = [];
for (const item of accounts) {
const payload = {
provider: item.provider,
name: item.name,
credentials: JSON.stringify(item.credentials),
...(item.timeout_seconds !== undefined ? { timeout_seconds: item.timeout_seconds } : {}),
...(item.policy_custom !== undefined ? { policy_custom: item.policy_custom } : {}),
...(item.proxy_profile_id !== undefined ? { proxy_profile_id: item.proxy_profile_id } : {})
};
const created = await api("/v1/cloud-accounts", "POST", payload);
importedIds.push(created.id);
}
console.log("imported_ids:", importedIds);
const scan = await api("/v1/scans", "POST", { selected_accounts: importedIds });
const scanId = scan.scan_id;
console.log("scan_id =", scanId);
let detail = {};
while (true) {
detail = await api(`/v1/scans/${scanId}`);
const status = detail.status || "";
console.log("status =", status);
if (["completed", "failed", "canceled"].includes(status)) break;
await new Promise((resolve) => setTimeout(resolve, 2000));
}
console.log(JSON.stringify(detail, null, 2));
})();
Playbook 9: Weekly Governance Loop with Full-Scope Fallback
Use requested account targets when they exist, auto-fallback to full-scope scan when they do not, and persist a weekly schedule with the validated scan scope.
This playbook explicitly covers edge handling for POST /v1/scans: no body (full scan), empty target resolution, and targeted-run fallback.
export CWS_BASE_URL="https://192.168.1.4:9098"
export CWS_TOKEN="YOUR_API_TOKEN"
export CWS_TARGET_IDS="profile_aws_prod,profile_ali_cn" # optional; empty means full-scope scan
export CWS_REPORT_EMAIL="finops@company.com" # optional
# 1) Normalize runtime settings and notification policy
curl -k -X PATCH "$CWS_BASE_URL/v1/settings/general" \
-H "Authorization: Bearer $CWS_TOKEN" \
-H "Content-Type: application/json" \
-d '{"currency":"USD","api_timeout_seconds":15}'
curl -k -X PATCH "$CWS_BASE_URL/v1/notifications/policy" \
-H "Authorization: Bearer $CWS_TOKEN" \
-H "Content-Type: application/json" \
-d '{"notification_trigger_mode":"scan_complete"}'
# 2) Resolve requested targets against current account inventory
accounts_json=$(curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/accounts")
selected_accounts=$(python3 - <<'PY' <<<"$accounts_json"
import json
import os
import sys
accounts = json.loads(sys.stdin.read() or "[]")
wanted = [item.strip() for item in os.getenv("CWS_TARGET_IDS", "").split(",") if item.strip()]
available = {item.get("id", "") for item in accounts if item.get("id")}
selected = [item for item in wanted if item in available]
print(json.dumps(selected))
PY
)
export SELECTED_ACCOUNTS_JSON="$selected_accounts"
poll_scan_until_terminal() {
local scan_id="$1"
while true; do
detail=$(curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/scans/$scan_id")
status=$(python3 -c 'import sys,json; print(json.load(sys.stdin).get("status",""))' <<<"$detail")
echo "scan_id=$scan_id status=$status"
if [ "$status" = "completed" ] || [ "$status" = "failed" ] || [ "$status" = "canceled" ]; then
printf "%s" "$detail" > "scan_${scan_id}.json"
break
fi
sleep 2
done
}
# 3) Start scan with adaptive scope
if [ "$selected_accounts" = "[]" ]; then
echo "No valid target IDs matched. Starting full-scope scan (no body)."
scan_scope="full"
scan_id=$(curl -k -X POST "$CWS_BASE_URL/v1/scans" -H "Authorization: Bearer $CWS_TOKEN" | python3 -c 'import sys,json; print(json.load(sys.stdin)["scan_id"])')
else
scan_scope="targeted"
scan_payload=$(python3 - <<'PY'
import json
import os
payload = {"selected_accounts": json.loads(os.environ["SELECTED_ACCOUNTS_JSON"])}
email = os.getenv("CWS_REPORT_EMAIL", "").strip()
if email:
payload["report_emails"] = [email]
print(json.dumps(payload))
PY
)
scan_id=$(curl -k -X POST "$CWS_BASE_URL/v1/scans" \
-H "Authorization: Bearer $CWS_TOKEN" \
-H "Content-Type: application/json" \
-d "$scan_payload" \
| python3 -c 'import sys,json; print(json.load(sys.stdin)["scan_id"])')
fi
poll_scan_until_terminal "$scan_id"
status=$(python3 -c 'import sys,json; print(json.load(sys.stdin).get("status",""))' < "scan_${scan_id}.json")
if [ "$status" = "failed" ] && [ "$scan_scope" = "targeted" ]; then
echo "Targeted scan failed. Retrying once with full-scope scan."
scan_scope="full_fallback"
scan_id=$(curl -k -X POST "$CWS_BASE_URL/v1/scans" -H "Authorization: Bearer $CWS_TOKEN" | python3 -c 'import sys,json; print(json.load(sys.stdin)["scan_id"])')
poll_scan_until_terminal "$scan_id"
fi
# 4) Persist weekly schedule using validated scope and run once now
schedule_payload=$(python3 - <<'PY'
import json
import os
import time
selected = json.loads(os.environ["SELECTED_ACCOUNTS_JSON"])
email = os.getenv("CWS_REPORT_EMAIL", "").strip()
scan = {}
if selected:
scan["selected_accounts"] = selected
if email:
scan["report_emails"] = [email]
payload = {
"name": "weekly-governance-loop",
"enabled": True,
"run_at": int(time.time()) + 300,
"interval_minutes": 10080,
"timezone": "UTC",
"scan": scan
}
print(json.dumps(payload))
PY
)
schedule_id=$(curl -k -X POST "$CWS_BASE_URL/v1/schedules" \
-H "Authorization: Bearer $CWS_TOKEN" \
-H "Content-Type: application/json" \
-d "$schedule_payload" \
| python3 -c 'import sys,json; print(json.load(sys.stdin)["id"])')
curl -k -X POST "$CWS_BASE_URL/v1/schedules/$schedule_id/run-now" -H "Authorization: Bearer $CWS_TOKEN"
echo "schedule_id=$schedule_id"
import json
import ssl
import time
import urllib.request
BASE_URL = "https://192.168.1.4:9098"
TOKEN = "YOUR_API_TOKEN"
TARGET_IDS = ["profile_aws_prod", "profile_ali_cn"] # empty list => full-scope
REPORT_EMAIL = "finops@company.com" # optional
TLS_CONTEXT = ssl._create_unverified_context()
def api(path, method="GET", payload=None):
headers = {"Authorization": f"Bearer {TOKEN}"}
body = None
if payload is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(f"{BASE_URL}{path}", data=body, headers=headers, method=method)
with urllib.request.urlopen(req, context=TLS_CONTEXT, timeout=30) as resp:
text = resp.read().decode("utf-8")
return json.loads(text) if text else {}
def create_scan(payload=None):
return api("/v1/scans", "POST", payload)
def poll(scan_id):
while True:
detail = api(f"/v1/scans/{scan_id}")
status = detail.get("status", "")
print("scan_id =", scan_id, "status =", status)
if status in {"completed", "failed", "canceled"}:
return detail
time.sleep(2)
# 1) Normalize settings and policy
api("/v1/settings/general", "PATCH", {"currency": "USD", "api_timeout_seconds": 15})
api("/v1/notifications/policy", "PATCH", {"notification_trigger_mode": "scan_complete"})
# 2) Resolve requested IDs against /v1/accounts
accounts = api("/v1/accounts")
available_ids = {item.get("id") for item in accounts if item.get("id")}
selected = [item for item in TARGET_IDS if item in available_ids]
scan_payload = None
if selected:
scan_payload = {"selected_accounts": selected}
if REPORT_EMAIL:
scan_payload["report_emails"] = [REPORT_EMAIL]
else:
print("No valid targets matched. Using full-scope scan (no request body).")
# 3) Run scan and fallback once if targeted run fails
scan = create_scan(scan_payload)
scan_id = scan["scan_id"]
detail = poll(scan_id)
if detail.get("status") == "failed" and selected:
print("Targeted scan failed, retrying once with full-scope.")
scan = create_scan(None)
scan_id = scan["scan_id"]
detail = poll(scan_id)
print("final_scan:", json.dumps(detail, indent=2))
# 4) Save weekly schedule and verify by run-now
schedule_scan = {}
if selected:
schedule_scan["selected_accounts"] = selected
if REPORT_EMAIL:
schedule_scan["report_emails"] = [REPORT_EMAIL]
schedule_payload = {
"name": "weekly-governance-loop",
"enabled": True,
"run_at": int(time.time()) + 300,
"interval_minutes": 10080,
"timezone": "UTC",
"scan": schedule_scan
}
created = api("/v1/schedules", "POST", schedule_payload)
schedule_id = created["id"]
run_now = api(f"/v1/schedules/{schedule_id}/run-now", "POST")
print("schedule_id =", schedule_id)
print("run_now:", json.dumps(run_now, indent=2))
(async () => {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
const BASE_URL = "https://192.168.1.4:9098";
const TOKEN = "YOUR_API_TOKEN";
const TARGET_IDS = ["profile_aws_prod", "profile_ali_cn"]; // empty => full-scope
const REPORT_EMAIL = "finops@company.com"; // optional
async function api(path, method = "GET", payload = undefined) {
const headers = { Authorization: `Bearer ${TOKEN}` };
const options = { method, headers };
if (payload !== undefined) {
headers["Content-Type"] = "application/json";
options.body = JSON.stringify(payload);
}
const res = await fetch(`${BASE_URL}${path}`, options);
const text = await res.text();
return text ? JSON.parse(text) : {};
}
async function createScan(payload = undefined) {
return api("/v1/scans", "POST", payload);
}
async function poll(scanId) {
while (true) {
const detail = await api(`/v1/scans/${scanId}`);
const status = detail.status || "";
console.log("scan_id =", scanId, "status =", status);
if (["completed", "failed", "canceled"].includes(status)) return detail;
await new Promise((resolve) => setTimeout(resolve, 2000));
}
}
await api("/v1/settings/general", "PATCH", { currency: "USD", api_timeout_seconds: 15 });
await api("/v1/notifications/policy", "PATCH", { notification_trigger_mode: "scan_complete" });
const accounts = await api("/v1/accounts");
const availableIds = new Set(accounts.filter((x) => x.id).map((x) => x.id));
const selected = TARGET_IDS.filter((id) => availableIds.has(id));
let scanPayload;
if (selected.length > 0) {
scanPayload = { selected_accounts: selected };
if (REPORT_EMAIL) scanPayload.report_emails = [REPORT_EMAIL];
} else {
console.log("No valid targets matched. Using full-scope scan (no request body).");
}
let scan = await createScan(scanPayload);
let scanId = scan.scan_id;
let detail = await poll(scanId);
if (detail.status === "failed" && selected.length > 0) {
console.log("Targeted scan failed, retrying once with full-scope.");
scan = await createScan(undefined);
scanId = scan.scan_id;
detail = await poll(scanId);
}
console.log("final_scan:", JSON.stringify(detail, null, 2));
const scheduleScan = {};
if (selected.length > 0) scheduleScan.selected_accounts = selected;
if (REPORT_EMAIL) scheduleScan.report_emails = [REPORT_EMAIL];
const schedulePayload = {
name: "weekly-governance-loop",
enabled: true,
run_at: Math.floor(Date.now() / 1000) + 300,
interval_minutes: 10080,
timezone: "UTC",
scan: scheduleScan
};
const created = await api("/v1/schedules", "POST", schedulePayload);
const scheduleId = created.id;
const runNow = await api(`/v1/schedules/${scheduleId}/run-now`, "POST");
console.log("schedule_id =", scheduleId);
console.log("run_now:", JSON.stringify(runNow, null, 2));
})();
- Edge case covered: if requested IDs are missing or stale, the flow automatically switches to full-scope scan by omitting request body.
- Edge case covered: if targeted scan fails, the flow retries once with full-scope before persisting schedule.
- Contract note: schedule payload keeps
scan: {}for full-scope mode, orselected_accountsfor stable targeted mode.
Playbook 10: Kubernetes and Container Waste Scan
Use the Local API to run a read-only Kubernetes scan through local kubectl, then fetch findings, inventory, and governance reports without sending kubeconfig or scan results to a hosted control plane.
- Prerequisite:
kubectlis installed on the operator machine. - Prerequisite: kubeconfig/context has read-only access to pods, nodes, PVs, PVCs, services, deployments, StatefulSets, and DaemonSets.
- Scope note: this is local kubectl execution, not a direct kube-apiserver SDK client.
export KUBECONFIG_PATH="/home/operator/.kube/config"
export KUBE_CONTEXT="prod"
kubectl --kubeconfig "$KUBECONFIG_PATH" --context "$KUBE_CONTEXT" version --client
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/k8s/contexts?path=$KUBECONFIG_PATH"
curl -k -X POST "$CWS_BASE_URL/v1/k8s/scans" -H "Authorization: Bearer $CWS_TOKEN" -H "Content-Type: application/json" -d "{\"kubeconfig_path\":\"$KUBECONFIG_PATH\",\"kube_context\":\"$KUBE_CONTEXT\"}"
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/k8s/findings"
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/reports/k8s-governance"
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/reports/k8s-action-plan"
import json
import ssl
import urllib.parse
import urllib.request
BASE_URL = "https://192.168.1.4:9098"
TOKEN = "YOUR_API_TOKEN"
KUBECONFIG_PATH = "/home/operator/.kube/config"
KUBE_CONTEXT = "prod"
TLS_CONTEXT = ssl._create_unverified_context()
def api(path, method="GET", payload=None):
headers = {"Authorization": f"Bearer {TOKEN}"}
body = None
if payload is not None:
headers["Content-Type"] = "application/json"
body = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(f"{BASE_URL}{path}", data=body, headers=headers, method=method)
with urllib.request.urlopen(req, context=TLS_CONTEXT, timeout=30) as resp:
text = resp.read().decode("utf-8")
return json.loads(text) if text else {}
query = urllib.parse.urlencode({"path": KUBECONFIG_PATH})
print(api(f"/v1/k8s/contexts?{query}"))
print(api("/v1/k8s/scans", "POST", {"kubeconfig_path": KUBECONFIG_PATH, "kube_context": KUBE_CONTEXT}))
print(api("/v1/k8s/findings"))
print(api("/v1/reports/k8s-governance"))
print(api("/v1/reports/k8s-action-plan"))
(async () => {
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
const BASE_URL = "https://192.168.1.4:9098";
const TOKEN = "YOUR_API_TOKEN";
const KUBECONFIG_PATH = "/home/operator/.kube/config";
const KUBE_CONTEXT = "prod";
async function api(path, method = "GET", payload = null) {
const res = await fetch(`${BASE_URL}${path}`, {
method,
headers: {
Authorization: `Bearer ${TOKEN}`,
...(payload ? { "Content-Type": "application/json" } : {})
},
body: payload ? JSON.stringify(payload) : undefined
});
const text = await res.text();
return text ? JSON.parse(text) : {};
}
const query = new URLSearchParams({ path: KUBECONFIG_PATH }).toString();
console.log(await api(`/v1/k8s/contexts?${query}`));
console.log(await api("/v1/k8s/scans", "POST", { kubeconfig_path: KUBECONFIG_PATH, kube_context: KUBE_CONTEXT }));
console.log(await api("/v1/k8s/findings"));
console.log(await api("/v1/reports/k8s-governance"));
console.log(await api("/v1/reports/k8s-action-plan"));
})();
If you want a combined cloud-plus-Kubernetes run, call POST /v1/scans with include_kubernetes=true, kubeconfig_path, and kube_context.
Playbook 11: OpenAPI, SDK, Pagination, and Webhook Verification
Use this flow when you are wiring CWS into internal automation and need a stable contract, paginated reads, and signed downstream events.
# Discover the active route contract.
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/openapi.json"
# Check versioning, pagination, and webhook-signing policy.
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/meta/compatibility"
# Read findings with cursor pagination envelope.
curl -k -H "Authorization: Bearer $CWS_TOKEN" "$CWS_BASE_URL/v1/findings?envelope=true&limit=50"
from cws_client import CwsClient, verify_webhook_signature
client = CwsClient(base_url="https://127.0.0.1:9123", token="YOUR_API_TOKEN")
print(client.openapi())
print(client.compatibility())
print(client.list_findings(limit=50))
ok = verify_webhook_signature(
secret="shared-secret",
timestamp="1762502400",
body='{"event":"scan.completed"}',
signature="sha256=RECEIVED_SIGNATURE",
)
import { CwsClient, verifyWebhookSignature } from "@cloud-waste-scanner/client";
const client = new CwsClient({ baseUrl: "https://127.0.0.1:9123", token: "YOUR_API_TOKEN" });
console.log(await client.openapi());
console.log(await client.compatibility());
console.log(await client.listFindings({ limit: 50 }));
const ok = verifyWebhookSignature({
secret: "shared-secret",
timestamp: "1762502400",
body: '{"event":"scan.completed"}',
signature: "sha256=RECEIVED_SIGNATURE",
});
Failure Handling Runbook
- If request is rejected immediately, verify HTTPS/base URL, bearer token, and payload schema first.
- If settings patch fails with
400, checknotification_trigger_modevalue isscan_completeorwaste_only. - If scan enters
failed, inspect job error plus route/proxy diagnostics and rerun with smallerselected_accountsscope. - If report delivery fails, keep scan evidence and retry report generation/download as an independent step.
- If webhook delivery fails, run webhook test endpoint and compare destination reachability and response status.
Map failure classes to stable codes in API Errors Catalog, then confirm endpoint-specific recovery behavior in API Reference.
Quota Safety Guarantees
- If no active checks run for selected accounts, the scan is rejected and not counted toward scan quota.
- If all checks fail and no cloud data is collected, the scan is rejected and not counted toward scan quota.
- Connection/proxy/notification test operations do not consume scan quota.
Run the playbooks against your own local API endpoint.
Save your first $1,000 before the next billing cycle.