How-to Guides

This section walks you through the process of using the API to manage your organization’s dashboards. These guides expect that you have a basic understanding of programming. The examples are written in Python, but the concepts should be reasonable to convert to other languages.

Hint

We’ll be assuming you have a valid API token and API endpoint. If you don’t have those, please reach out to api@aspiredu.com for help.

Uploading changes to multiple dashboards

This section will walk you through how to upload a complete set of changes to several dashboards. It will utilize a clear_existing_policy value of 'all' which will first clear all students/viewers dashboard’s before processing the data in the upload.

General flow

  1. Collect changes (dashboard name, student SIS ID) or (dashboard name, viewer SIS ID, is ally)
    1. {"dashboard_name": str, "student_sis_id": str}

    2. {"dashboard_name": str, "viewer_sis_id": str, "viewer_is_ally": bool}

  2. Upload changes and store the upload id from the response

  3. Use upload id to poll the dashboard’s detail URL periodically for status update

  4. Send status update to stakeholders (logs, email, etc)

Code example

import json
import time
from collections import defaultdict
from logging import getLogger
from urllib.parse import urljoin
# This uses a third-party library to make the HTTP requests.
# It can be installed via:
#   pip install requests
import requests
# When you get an API token, you'll be provided with the URL
# you need to interact with.
API_TOKEN = "***"
BACKEND_URL = "https://[your backend here].aspiredu.com"
logger = getLogger(__name__)


def upload_data(data, data_type, clear_existing_policy):
    """Upload the dashboard data to AspirEDU"""
    url = urljoin(BACKEND_URL, "/detective/api/v2/dashboard/upload/")
    data = {
        "clear_existing_policy": clear_existing_policy,
        "data_type": data_type,
        "data": data,
    }
    results = requests.post(
        url, json=data, headers={"Authorization": f"Bearer {API_TOKEN}"}
    )
    return results.json()


def fetch_upload(upload_id):
    """Fetch the dashboard upload's details"""
    url = urljoin(BACKEND_URL, f"/detective/api/v2/dashboard/upload/{upload_id}/")
    results = requests.get(url, headers={"Authorization": f"Bearer {API_TOKEN}"})
    # The upload_id is contained in the response
    return results.json()


def upload_and_report(data, data_type):
    """
    Upload the data to AspirEDU and report the status if not successful.
    """
    # Be careful with clear_existing_policy="all" as
    # it will remove all dashboards of that data type.
    upload = upload_data(data, data_type=data_type, clear_existing_policy="all")

    while upload["status"] == "uploading":
        # Wait for the upload to complete
        time.sleep(30)
        upload = fetch_upload(upload["id"])

    # Report status
    if upload["status"] == "success_with_missing":
        # Send an email or log the error
        logger.warning(
            json.dumps({
                "event": "Dashboard upload has missing users",
                "upload": upload
            })
        )
        send_email(
            to_email="",
            from_email="",
            subject="Missing users for upload",
            body=f"Missing {data_type}s for dashboard upload: {upload['missing']}",
        )
    elif upload["status"] == "error":
        # Send an email or log the error
        logger.error(
            json.dumps({
                "event": "Dashboard upload error",
                "upload": upload
            })
        )
        send_email(
            to_email="",
            from_email="",
            subject="Error on upload",
            body=f"Dashboard Upload {upload['id']} had error: {upload['error']}",
        )
    else:  # Success
        logger.info(
            json.dumps({
                "event": "Dashboard upload success",
                "upload": upload
            })
        )


# You will need to collect this information from your own system.
# This shows what structure it should be in.
viewers = [
    {"dashboard_name": "foo", "viewer_sis_id": "123", "viewer_is_ally": True},
    {"dashboard_name": "foo", "viewer_sis_id": "456", "viewer_is_ally": False},
    {"dashboard_name": "bar", "viewer_sis_id": "123", "viewer_is_ally": True},
]
students = [
    {"dashboard_name": "foo", "student_sis_id": "a1"},
    {"dashboard_name": "foo", "student_sis_id": "a2"},
    {"dashboard_name": "bar", "student_sis_id": "a1"},
    {"dashboard_name": "bar", "student_sis_id": "a3"},
]

upload_and_report(viewers, "viewer")
upload_and_report(students, "student")

Detecting changes to dashboards

This section will walk you through how to compare your expected dashboard collection with what currently exists. This can be useful to detect whether administrators are modifying dashboard from within the Dropout Detective settings area.

General flow

  1. Collect changes (dashboard name, student SIS ID) or (dashboard name, viewer SIS ID, is ally)

  2. Make API request for each dashboard’s students and viewers, compare the sets

  3. Use upload id to poll the dashboard’s detail URL periodically for status update

  4. If there are differences, send a report to stakeholders (logs, email, etc)

Code example

import json
import time
from logging import getLogger
from urllib.parse import urljoin
# This uses a third-party library to make the HTTP requests.
# It can be installed via:
#   pip install requests
import requests
# When you get an API token, you'll be provided with the URL
# you need to interact with.
API_TOKEN = "***"
BACKEND_URL = "https://[your backend here].aspiredu.com"
logger = getLogger(__name__)


def paginate_results(initial_url):
    """Paginate through the results of an API endpoint"""
    results = requests.get(
        initial_url, headers={"Authorization": f"Bearer {API_TOKEN}"}
    )
    collection = results.json()
    yield from collection["results"]
    # The API endpoint is paginated. If there's a next page, fetch that.
    while collection["next"]:
        # Fetch the next page
        results = requests.get(
            collection["next"], headers={"Authorization": f"Bearer {API_TOKEN}"}
        )
        collection = results.json()
        yield from collection["results"]


def fetch_dashboard_users(dashboard_id, data_type):
    """Fetch the users for a given dashboard"""
    url = urljoin(
        BACKEND_URL, f"/detective/api/v2/dashboard/{dashboard_id}/{data_type}/")
    yield from paginate_results(url)


def fetch_dashboards():
    """Fetch the dashboards"""
    url = urljoin(BACKEND_URL, "/detective/api/v2/dashboard/")
    yield from paginate_results(url)


def check_dashboard_differences(data, data_type):
    """
    Check for dashboard differences for the given data type

    This assumes the data is in the format of:
    [
        {"dashboard_name": "foo", "viewer_sis_id": "123", "viewer_is_ally": True},
    ]
    """
    # Restructure the data to be in the form of
    # {"foo": set("123")}
    # This is easier to compare to what is coming in from the API.
    expected_sis_ids = defaultdict(set)
    for element in data:
        # Remove the dashboard_name from the element dictionary
        dashboard_name = element.pop("dashboard_name")
        expected_sis_ids[dashboard_name].add(element[f"{data_type}_sis_id"])
    # Iterate over all the dashboards
    for dashboard in fetch_dashboards():
        existing_sis_ids = set()
        for user in fetch_dashboard_users(dashboard["id"], data_type):
            if user["sis_id"]:
                existing_sis_ids.add(user["sis_id"])
        # This identifies any memberships that don't exist and need to be added
        # in Dropout Detective
        missing_users = list(sorted(
            expected_sis_ids[dashboard["name"]] - existing_sis_ids
        ))
        # This identifies any memberships that need to be removed
        # in Dropout Detective
        unexpected_users = list(sorted(
            existing_sis_ids - expected_sis_ids[dashboard["name"]]
        ))
        if missing_users or unexpected_users:
            logger.warning(
                json.dumps({
                    "missing_users": missing_users,
                    "unexpected_users": unexpected_users,
                    "dashboard": dashboard["name"]
                })
            )
            send_email(
                to_email="",
                from_email="",
                subject=f"Differences detected in dashboard {dashboard['name']}",
                body=(
                    f"Missing users: {missing_users}\n"
                    f"Unexpected users: {unexpected_users}"
                ),
            )


# You will need to collect this information from your own system.
# This shows what structure it should be in.
viewers = [
    {"dashboard_name": "foo", "viewer_sis_id": "123", "viewer_is_ally": True},
    {"dashboard_name": "foo", "viewer_sis_id": "456", "viewer_is_ally": False},
    {"dashboard_name": "bar", "viewer_sis_id": "123", "viewer_is_ally": True},
]
students = [
    {"dashboard_name": "foo", "student_sis_id": "a1"},
    {"dashboard_name": "foo", "student_sis_id": "a2"},
    {"dashboard_name": "bar", "student_sis_id": "a1"},
    {"dashboard_name": "bar", "student_sis_id": "a3"},
]

check_dashboard_differences(viewers, "viewer")
check_dashboard_differences(students, "student")

Uploading many changes to multiple dashboards

This section will walk you through how to upload many changes to a variety of dashboards. Let’s assume that not all of the dashboards need to be updated and some should remain untouched. Let’s also assume that our data should override the existing data in the dashboard.

To do this, it will utilize a clear_existing_policy value of 'contained' which will first clear only the students/viewers specified in dashboards included in the uploads data. Since the upload will only contain the data for a single dashboard, it effectively clears and resets the dashboard to the new data.

General flow

  1. Collect changes per dashboard (dashboard name, student SIS ID), (dashboard name, viewer SIS ID, is ally)

  2. Upload changes per dashboard and store id for each upload

  3. Check ids periodically for success or failure

  4. Send status update to stakeholders (logs, email, etc)

Code example

import json
import time
from collections import defaultdict
from logging import getLogger
from urllib.parse import urljoin
# This uses a third-party library to make the HTTP requests.
# It can be installed via:
#   pip install requests
import requests
# When you get an API token, you'll be provided with the URL
# you need to interact with.
API_TOKEN = "***"
BACKEND_URL = "https://[your backend here].aspiredu.com"
logger = getLogger(__name__)

def upload_data(data, data_type, clear_existing_policy):
    """Upload the dashboard data to AspirEDU"""
    url = urljoin(BACKEND_URL, "/detective/api/v2/dashboard/upload/")
    data = {
        "clear_existing_policy": clear_existing_policy,
        "data_type": data_type,
        "data": data,
    }
    results = requests.post(
        url, json=data, headers={"Authorization": f"Bearer {API_TOKEN}"}
    )
    return results.json()


def fetch_upload(upload_id):
    """Fetch the dashboard upload's details"""
    url = urljoin(BACKEND_URL, f"/detective/api/v2/dashboard/upload/{upload_id}/")
    results = requests.get(url, headers={"Authorization": f"Bearer {API_TOKEN}"})
    # The upload_id is contained in the response
    return results.json()


def check_if_uploaded(upload_id):
    """
    Check if a dashboard upload has completed and send a report if it has.

    Return True if it has been uploaded, False otherwise.
    """
    upload = fetch_upload(upload_id)
    if upload["status"] == "uploading":
        # Still uploading
        return False

    # Report status
    if upload["status"] == "success_with_missing":
        # Send an email or log the error
        logger.warning(
            json.dumps({
                "event": "Dashboard upload has missing users",
                "upload": upload,
            })
        )
        send_email(
            to_email="",
            from_email="",
            subject="Missing users for upload",
            body=(
                f"Missing {upload['data_type']}s for "
                f"dashboard upload: {upload['missing']}"
            ),
        )
    elif upload["status"] == "error":
        # Send an email or log the error
        logger.error(
            json.dumps({
                "event": "Dashboard upload error",
                "upload": upload,
            })
        )
        send_email(
            to_email="",
            from_email="",
            subject="Error on upload",
            body=f"Dashboard upload {upload['id']} had error: {upload['error']}",
        )
    else:  # Success
        logger.info(
            json.dumps({
                "event": "Dashboard upload success",
                "upload": upload,
            })
        )
    return True


# You will need to collect this information from your own system.
# This shows what structure it should be in.
viewers = [
    [
        {"dashboard_name": "foo", "viewer_sis_id": "123", "viewer_is_ally": True},
        {"dashboard_name": "foo", "viewer_sis_id": "456", "viewer_is_ally": False},
    ],
    [
        {"dashboard_name": "bar", "viewer_sis_id": "123", "viewer_is_ally": True},
    ]
]
students = [
    [
        {"dashboard_name": "foo", "student_sis_id": "a1"},
        {"dashboard_name": "foo", "student_sis_id": "a2"},
    ],
    [
        {"dashboard_name": "bar", "student_sis_id": "a1"},
        {"dashboard_name": "bar", "student_sis_id": "a3"},
    ]
]
upload_ids = []
for dashboards in viewers:
    upload = upload_data(
        dashboards, data_type="viewer", clear_existing_policy="contained"
    )
    upload_ids.append(upload["id"])
for dashboards in students:
    upload = upload_data(
        dashboards, data_type="student", clear_existing_policy="contained"
    )
    upload_ids.append(upload["id"])

# Check if the uploads have completed
is_uploading = bool(upload_ids)
while is_uploading:
    # Wait for the uploads to complete
    time.sleep(30)
    # If any of the uploads are still uploading, we'll continue to wait
    # Assume that all are done and reset this to True if any are still uploading.
    is_uploading = False
    # In python, we can't remove from the list while we're iterating over it.
    # So create a collection of completed uploads and remove them after the loop.
    completed_uploads = set()
    for upload_id in upload_ids:
        if not check_if_uploaded(upload_id):
            is_uploading = True
        else:
            completed_uploads.add(upload_id)
    upload_ids = [
         upload_id
         for upload_id in upload_ids
         if upload_id not in completed_uploads
    ]