Skip to main content
Chef kitchen with recipe cards for PI Web API coding patterns
Cookbook

PI Web API Cookbook

Copy-paste Python recipes for every common PI Web API task. Each recipe is self-contained, production-tested, and includes error handling.

How to use this cookbook

Every recipe is designed to be copied directly into your project. Each one includes imports, configuration, the core logic, and error handling. Recipes are organized by use case: start with Session Setup, then pick the recipes you need.

All recipes assume you have created a session using Recipe 1. Variables like session, BASE_URL, and WEB_ID are shared across recipes.

Recipes by category

Foundation

Reading data

Writing data

Batch and bulk

AF and hierarchy

Production patterns

Recipe 1: Session setup

Create a reusable session with authentication, SSL certificate handling, connection pooling, and automatic retries. Every other recipe depends on this.

session_setup.pypython
import os
import requests

from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def create_pi_session(
    base_url: str | None = None,
    username: str | None = None,
    password: str | None = None,
    ca_bundle: str | bool | None = None,
    pool_size: int = 10,
    max_retries: int = 3,
) -> requests.Session:
    """Create a configured session for PI Web API.

    Args:
        base_url: PI Web API URL (falls back to PI_WEB_API_URL env var)
        username: Auth username (falls back to PI_USERNAME env var)
        password: Auth password (falls back to PI_PASSWORD env var)
        ca_bundle: Path to CA cert, True for default, False to skip verify
        pool_size: Connection pool size (increase for high concurrency)
        max_retries: Number of retries for transient failures
    """
    session = requests.Session()

    # Authentication
    user = username or os.environ.get("PI_USERNAME", "")
    pwd = password or os.environ.get("PI_PASSWORD", "")
    if user:
        session.auth = (user, pwd)

    # SSL verification
    if ca_bundle is not None:
        session.verify = ca_bundle
    else:
        env_bundle = os.environ.get("PI_CA_BUNDLE")
        session.verify = env_bundle if env_bundle else True

    # Connection pooling + retry logic
    retries = Retry(
        total=max_retries,
        backoff_factor=1,  # 1s, 2s, 4s between retries
        status_forcelist=[408, 429, 500, 502, 503, 504],
        allowed_methods=["GET", "HEAD"],  # Only retry safe methods
    )
    adapter = HTTPAdapter(
        max_retries=retries,
        pool_connections=pool_size,
        pool_maxsize=pool_size,
    )
    session.mount("https://", adapter)

    # Default headers
    session.headers.update({
        "Accept": "application/json",
        "Content-Type": "application/json",
        "X-Requested-With": "PiSharp-Cookbook",
    })

    return session


# Usage
BASE_URL = os.environ.get("PI_WEB_API_URL", "https://myserver/piwebapi")
session = create_pi_session()

# Verify connectivity
resp = session.get(f"{BASE_URL}/")
if resp.ok:
    info = resp.json()
    print(f"Connected to {info['ProductTitle']} {info['ProductVersion']}")
else:
    print(f"Connection failed: {resp.status_code}")

Recipe 2: Find a PI point

Look up a PI point by its full path (most reliable) or by search query. Returns the WebID needed for all data operations.

find_point.pypython
def find_point_by_path(session, base_url, server_name, point_name):
    """Find a PI point by its full path. Most reliable method.

    Returns: (web_id, point_name) tuple
    """
    path = f"\\\\{server_name}\\{point_name}"
    resp = session.get(
        f"{base_url}/points",
        params={
            "path": path,
            "selectedFields": "WebId;Name;PointType;EngineeringUnits",
        },
    )
    resp.raise_for_status()
    data = resp.json()
    return data["WebId"], data["Name"]


def find_points_by_filter(session, base_url, data_server_web_id,
                          name_filter, max_count=100):
    """Find PI points by name pattern on a specific data server.

    Args:
        name_filter: Wildcard pattern like "Temperature*" or "Unit1_*"

    Returns: List of {WebId, Name, PointType} dicts
    """
    resp = session.get(
        f"{base_url}/dataservers/{data_server_web_id}/points",
        params={
            "nameFilter": name_filter,
            "maxCount": max_count,
            "selectedFields": "Items.WebId;Items.Name;Items.PointType",
        },
    )
    resp.raise_for_status()
    return resp.json().get("Items", [])


# Usage: path-based lookup (preferred)
web_id, name = find_point_by_path(session, BASE_URL, "MY-PI-SERVER", "sinusoid")
print(f"Found {name}: {web_id}")

# Usage: filter-based lookup (multiple points)
# First get the data server WebID
ds_resp = session.get(f"{BASE_URL}/dataservers",
                      params={"selectedFields": "Items.WebId;Items.Name"})
ds_web_id = ds_resp.json()["Items"][0]["WebId"]

points = find_points_by_filter(session, BASE_URL, ds_web_id, "Temperature*")
for p in points:
    print(f"  {p['Name']}: {p['WebId']}")

Recipe 3: Read current value

Get the most recent snapshot value for a PI point, with proper handling for digital states and bad quality.

read_current.pypython
def read_current_value(session, base_url, web_id):
    """Read the current snapshot value for a PI point.

    Handles digital states, bad quality, and returns a clean dict.
    """
    resp = session.get(
        f"{base_url}/streams/{web_id}/value",
        params={"selectedFields": "Timestamp;Value;Good;Questionable;Substituted"},
    )
    resp.raise_for_status()
    data = resp.json()

    # Handle digital state values (come as objects, not numbers)
    value = data["Value"]
    is_digital = isinstance(value, dict)
    if is_digital:
        value = value.get("Name", value.get("Value", str(value)))

    return {
        "value": value,
        "timestamp": data["Timestamp"],
        "good": data.get("Good", True),
        "questionable": data.get("Questionable", False),
        "substituted": data.get("Substituted", False),
        "is_digital": is_digital,
    }


# Usage
result = read_current_value(session, BASE_URL, web_id)
if result["good"]:
    print(f"Value: {result['value']} at {result['timestamp']}")
else:
    print(f"Bad quality value: {result['value']} (questionable={result['questionable']})")

Recipe 4: Recorded values to DataFrame

Pull recorded historical values and load them into a pandas DataFrame with proper timestamps, quality filtering, and digital state handling.

recorded_to_dataframe.pypython
import pandas as pd
from datetime import datetime, timedelta, timezone

def recorded_to_dataframe(
    session, base_url, web_id,
    start_time="*-24h", end_time="*",
    max_count=10000, boundary_type="Inside",
    filter_good_only=True,
):
    """Fetch recorded values and return as a pandas DataFrame.

    Args:
        boundary_type: "Inside" (default), "Outside", or "Interpolated"
        filter_good_only: If True, remove bad-quality values

    Returns: DataFrame with Timestamp index, Value column
    """
    resp = session.get(
        f"{base_url}/streams/{web_id}/recorded",
        params={
            "startTime": start_time,
            "endTime": end_time,
            "maxCount": max_count,
            "boundaryType": boundary_type,
            "selectedFields": "Items.Timestamp;Items.Value;Items.Good",
        },
    )
    resp.raise_for_status()
    items = resp.json().get("Items", [])

    if not items:
        return pd.DataFrame(columns=["Value"])

    df = pd.DataFrame(items)
    df["Timestamp"] = pd.to_datetime(df["Timestamp"], utc=True)
    df = df.set_index("Timestamp").sort_index()

    # Handle digital state values
    def extract_value(v):
        if isinstance(v, dict):
            return v.get("Name", v.get("Value"))
        return v

    df["Value"] = df["Value"].apply(extract_value)

    # Filter by quality
    if filter_good_only and "Good" in df.columns:
        df = df[df["Good"] == True]

    # Warn if data was truncated
    if len(items) == max_count:
        print(f"WARNING: Returned exactly {max_count} values. "
              f"Data may be truncated. Increase maxCount or narrow time range.")

    return df[["Value"]]


# Usage
df = recorded_to_dataframe(session, BASE_URL, web_id, start_time="*-48h")
print(f"Got {len(df)} values")
print(df.describe())

Recipe 5: Summary statistics

Get min, max, average, total, count, and other statistics without pulling all raw data. Much faster than reading recorded values and computing client-side.

summary_values.pypython
def get_summaries(
    session, base_url, web_id,
    start_time="*-24h", end_time="*",
    summary_type="Average,Minimum,Maximum,Count,PercentGood",
    calculation_basis="TimeWeighted",
):
    """Get summary statistics for a PI point.

    Args:
        summary_type: Comma-separated list. Options: Average, Minimum,
            Maximum, Total, Count, StdDev, Range, PercentGood, All
        calculation_basis: "TimeWeighted" (default) or "EventWeighted"
            TimeWeighted: Weights values by duration (correct for flow/rate)
            EventWeighted: Equal weight per event (correct for batch counts)
    """
    resp = session.get(
        f"{base_url}/streams/{web_id}/summary",
        params={
            "startTime": start_time,
            "endTime": end_time,
            "summaryType": summary_type,
            "calculationBasis": calculation_basis,
        },
    )
    resp.raise_for_status()

    results = {}
    for item in resp.json().get("Items", []):
        stat_type = item["Type"]
        value = item["Value"]
        results[stat_type] = {
            "value": value.get("Value"),
            "timestamp": value.get("Timestamp"),
            "good": value.get("Good", True),
        }

    return results


# Usage
stats = get_summaries(session, BASE_URL, web_id, start_time="*-7d")
for stat, data in stats.items():
    print(f"{stat}: {data['value']}")

# Example output:
# Average: 72.34
# Minimum: 65.12
# Maximum: 81.90
# Count: 8640
# PercentGood: 99.87

Recipe 6: Write a single value

Write one value with a timestamp to a PI point, with control over update behavior and buffering.

write_single.pypython
from datetime import datetime, timezone

def write_value(
    session, base_url, web_id, value, timestamp=None,
    update_option="Replace", buffer_option="BufferIfPossible",
):
    """Write a single value to a PI point.

    Args:
        update_option: "Replace", "Insert", "NoReplace", "Remove",
                       "InsertNoCompression"
        buffer_option: "BufferIfPossible", "Buffer", "DoNotBuffer"
            BufferIfPossible: Queue if Data Archive is temporarily unavailable
            DoNotBuffer: Fail immediately if Data Archive is down

    Returns: True if write succeeded
    """
    if timestamp is None:
        timestamp = datetime.now(timezone.utc).isoformat()

    resp = session.post(
        f"{base_url}/streams/{web_id}/value",
        json={"Value": value, "Timestamp": timestamp},
        params={
            "updateOption": update_option,
            "bufferOption": buffer_option,
        },
    )

    if resp.status_code in (200, 202, 204):
        return True

    # Parse error details
    try:
        errors = resp.json().get("Errors", [])
        print(f"Write failed ({resp.status_code}): {errors}")
    except Exception:
        print(f"Write failed ({resp.status_code}): {resp.text[:200]}")

    return False


# Usage
success = write_value(session, BASE_URL, web_id, 72.5)
print(f"Write {'succeeded' if success else 'failed'}")

Write permissions required

Writing to PI points requires appropriate Data Access permissions on the PI Data Archive. If you get a 403 error, check the point security with your PI administrator.

Recipe 7: Write multiple values

Write a batch of historical values to a PI point with error checking for partial failures.

write_multiple.pypython
def write_values(session, base_url, web_id, values,
                 update_option="Replace"):
    """Write multiple values to a PI point.

    Args:
        values: List of {"Value": ..., "Timestamp": ...} dicts

    Returns: (success_count, error_count) tuple
    """
    resp = session.post(
        f"{base_url}/streams/{web_id}/recorded",
        json=values,
        params={"updateOption": update_option},
    )

    if resp.status_code in (200, 202, 204):
        return len(values), 0

    # Check for partial failures
    if resp.status_code == 207:  # Multi-status
        body = resp.json()
        errors = [e for e in body.get("Errors", []) if e]
        return len(values) - len(errors), len(errors)

    print(f"Write failed ({resp.status_code}): {resp.text[:300]}")
    return 0, len(values)


# Usage
values = [
    {"Value": 70.1, "Timestamp": "2026-03-15T10:00:00Z"},
    {"Value": 71.3, "Timestamp": "2026-03-15T10:05:00Z"},
    {"Value": 72.0, "Timestamp": "2026-03-15T10:10:00Z"},
    {"Value": 73.5, "Timestamp": "2026-03-15T10:15:00Z"},
]
ok, err = write_values(session, BASE_URL, web_id, values)
print(f"Written: {ok} succeeded, {err} failed")

Recipe 8: Batch read 100+ points

Read current values for a large number of PI points using batched requests with automatic chunking. One HTTP request per chunk instead of one per point.

batch_read_chunked.pypython
def batch_read_current(session, base_url, web_ids, chunk_size=100):
    """Read current values for many points using chunked batch requests.

    Args:
        web_ids: List of (name, web_id) tuples or just web_id strings
        chunk_size: Max sub-requests per batch (100 is a safe default)

    Returns: Dict mapping name/index to {value, timestamp, good}
    """
    # Normalize input
    if web_ids and isinstance(web_ids[0], str):
        web_ids = [(f"point_{i}", wid) for i, wid in enumerate(web_ids)]

    all_results = {}

    for i in range(0, len(web_ids), chunk_size):
        chunk = web_ids[i : i + chunk_size]
        batch = {}
        for name, wid in chunk:
            batch[name] = {
                "Method": "GET",
                "Resource": (
                    f"{base_url}/streams/{wid}/value"
                    f"?selectedFields=Timestamp;Value;Good"
                ),
            }

        resp = session.post(f"{base_url}/batch", json=batch)
        resp.raise_for_status()

        for key, result in resp.json().items():
            if result["Status"] == 200:
                content = result["Content"]
                value = content["Value"]
                if isinstance(value, dict):
                    value = value.get("Name", value.get("Value"))
                all_results[key] = {
                    "value": value,
                    "timestamp": content["Timestamp"],
                    "good": content.get("Good", True),
                }
            else:
                all_results[key] = {
                    "value": None,
                    "error": result["Status"],
                }

    return all_results


# Usage
point_list = [
    ("Temperature", "F1DPaH..."),
    ("Pressure", "F1DPbX..."),
    ("Flow", "F1DPcY..."),
]
values = batch_read_current(session, BASE_URL, point_list)
for name, val in values.items():
    if "error" not in val:
        print(f"{name}: {val['value']}")
    else:
        print(f"{name}: ERROR {val['error']}")

Recipe 9: AF element traversal

Walk the AF element hierarchy and read attribute values. Useful for discovering equipment structure and reading configuration from the asset model.

af_traversal.pypython
def get_af_databases(session, base_url):
    """List all AF databases."""
    resp = session.get(f"{base_url}/assetservers")
    resp.raise_for_status()
    servers = resp.json().get("Items", [])

    databases = []
    for server in servers:
        resp = session.get(
            f"{base_url}/assetservers/{server['WebId']}/assetdatabases",
            params={"selectedFields": "Items.WebId;Items.Name"},
        )
        if resp.ok:
            databases.extend(resp.json().get("Items", []))
    return databases


def get_child_elements(session, base_url, parent_web_id, max_count=1000):
    """Get child elements of an AF element."""
    resp = session.get(
        f"{base_url}/elements/{parent_web_id}/elements",
        params={
            "maxCount": max_count,
            "selectedFields": "Items.WebId;Items.Name;Items.TemplateName;Items.HasChildren",
        },
    )
    resp.raise_for_status()
    return resp.json().get("Items", [])


def get_element_attributes(session, base_url, element_web_id):
    """Get all attributes and their current values for an element."""
    resp = session.get(
        f"{base_url}/elements/{element_web_id}/attributes",
        params={
            "selectedFields": "Items.WebId;Items.Name;Items.Value;Items.Type",
        },
    )
    resp.raise_for_status()
    return resp.json().get("Items", [])


def walk_af_tree(session, base_url, element_web_id, depth=0, max_depth=3):
    """Recursively walk the AF hierarchy and print the tree."""
    children = get_child_elements(session, base_url, element_web_id)
    for child in children:
        indent = "  " * depth
        template = child.get("TemplateName", "")
        suffix = f" [{template}]" if template else ""
        print(f"{indent}{child['Name']}{suffix}")

        if child.get("HasChildren") and depth < max_depth:
            walk_af_tree(session, base_url, child["WebId"],
                         depth + 1, max_depth)


# Usage
# List databases
dbs = get_af_databases(session, BASE_URL)
for db in dbs:
    print(f"Database: {db['Name']}")

# Walk the tree from the database root
if dbs:
    # Get root elements of first database
    resp = session.get(
        f"{BASE_URL}/assetdatabases/{dbs[0]['WebId']}/elements",
        params={"selectedFields": "Items.WebId;Items.Name;Items.HasChildren"},
    )
    roots = resp.json().get("Items", [])
    for root in roots:
        print(f"\n{root['Name']}")
        walk_af_tree(session, BASE_URL, root["WebId"])

Recipe 10: Event frame search and creation

Search for existing event frames and create new ones. Event frames represent time-bounded process events (batches, alarms, shifts).

event_frames.pypython
def search_event_frames(session, base_url, database_web_id,
                        start_time="*-7d", end_time="*",
                        template_name=None, name_filter=None,
                        max_count=100):
    """Search for event frames in an AF database."""
    params = {
        "startTime": start_time,
        "endTime": end_time,
        "maxCount": max_count,
        "searchMode": "Overlapping",
        "selectedFields": (
            "Items.WebId;Items.Name;Items.TemplateName;"
            "Items.StartTime;Items.EndTime"
        ),
    }
    if template_name:
        params["templateName"] = template_name
    if name_filter:
        params["nameFilter"] = name_filter

    resp = session.get(
        f"{base_url}/assetdatabases/{database_web_id}/eventframes",
        params=params,
    )
    resp.raise_for_status()
    return resp.json().get("Items", [])


def create_event_frame(session, base_url, element_web_id,
                       name, start_time, end_time=None,
                       template_name=None, description=None):
    """Create a new event frame on an AF element.

    If end_time is None, creates an open (in-progress) event frame.
    """
    payload = {
        "Name": name,
        "StartTime": start_time,
    }
    if end_time:
        payload["EndTime"] = end_time
    if template_name:
        payload["TemplateName"] = template_name
    if description:
        payload["Description"] = description

    resp = session.post(
        f"{base_url}/elements/{element_web_id}/eventframes",
        json=payload,
    )
    resp.raise_for_status()
    # Location header contains the URL of the created event frame
    return resp.headers.get("Location", "")


# Usage: search for recent event frames
frames = search_event_frames(session, BASE_URL, db_web_id,
                             template_name="Batch")
for ef in frames:
    print(f"{ef['Name']}: {ef['StartTime']} -> {ef.get('EndTime', 'In progress')}")

# Usage: create a new event frame
location = create_event_frame(
    session, BASE_URL, element_web_id,
    name="Batch-2026-0315-001",
    start_time="2026-03-15T08:00:00Z",
    end_time="2026-03-15T16:00:00Z",
    template_name="Batch",
    description="Morning production batch",
)
print(f"Created event frame at: {location}")

Recipe 11: Incremental ETL with watermarks

A production-grade ETL pattern that reads new data since the last run using watermark tracking. Survives restarts and handles gaps gracefully.

incremental_etl.pypython
import json
from pathlib import Path
from datetime import datetime, timezone

class WatermarkTracker:
    """Track extraction watermarks for incremental ETL.

    Stores the last successfully extracted timestamp per point
    so the next run picks up where the previous one left off.
    """

    def __init__(self, path: str = "watermarks.json"):
        self.path = Path(path)
        self.marks = {}
        if self.path.exists():
            self.marks = json.loads(self.path.read_text())

    def get(self, key: str, default: str = "*-24h") -> str:
        return self.marks.get(key, default)

    def set(self, key: str, timestamp: str):
        self.marks[key] = timestamp
        self.path.write_text(json.dumps(self.marks, indent=2))


def incremental_extract(session, base_url, web_id, point_name,
                        tracker, max_count=10000):
    """Extract new recorded values since the last watermark.

    Returns: DataFrame of new values (empty if none)
    """
    import pandas as pd

    start = tracker.get(point_name)
    end = "*"

    resp = session.get(
        f"{base_url}/streams/{web_id}/recorded",
        params={
            "startTime": start,
            "endTime": end,
            "maxCount": max_count,
            "boundaryType": "Outside",
            "selectedFields": "Items.Timestamp;Items.Value;Items.Good",
        },
    )
    resp.raise_for_status()
    items = resp.json().get("Items", [])

    if not items:
        return pd.DataFrame()

    df = pd.DataFrame(items)
    df["Timestamp"] = pd.to_datetime(df["Timestamp"], utc=True)

    # Update watermark to the latest timestamp
    latest = df["Timestamp"].max().isoformat()
    tracker.set(point_name, latest)

    return df.set_index("Timestamp").sort_index()


# Usage
tracker = WatermarkTracker("my_pipeline_watermarks.json")

points = [
    ("Temperature", temp_web_id),
    ("Pressure", pressure_web_id),
]

for name, wid in points:
    df = incremental_extract(session, BASE_URL, wid, name, tracker)
    if not df.empty:
        print(f"{name}: extracted {len(df)} new values")
        # Save to your destination (database, CSV, data lake, etc.)
        # df.to_csv(f"{name}_extract.csv", mode="a", header=False)
    else:
        print(f"{name}: no new data")

Production ETL

For production workloads, run this script via a task scheduler (cron, Windows Task Scheduler, Airflow) instead of a loop. Store data in a database rather than CSV. Add monitoring to detect when extraction falls behind.

Recipe 12: Health check

Monitor PI Web API availability and data freshness. Useful for operations dashboards and alerting.

health_check.pypython
from datetime import datetime, timezone, timedelta

def pi_health_check(session, base_url, watchlist_web_ids=None,
                    stale_threshold_minutes=30):
    """Check PI Web API health and data freshness.

    Args:
        watchlist_web_ids: Optional list of (name, web_id) to check freshness
        stale_threshold_minutes: Flag data older than this as stale

    Returns: dict with status, latency, and details
    """
    import time
    result = {"status": "healthy", "checks": {}}

    # Check 1: API reachability
    start = time.perf_counter()
    try:
        resp = session.get(f"{base_url}/", timeout=10)
        latency = time.perf_counter() - start
        result["checks"]["api"] = {
            "status": "ok" if resp.ok else "error",
            "latency_ms": round(latency * 1000),
            "status_code": resp.status_code,
        }
        if not resp.ok:
            result["status"] = "degraded"
    except Exception as e:
        result["checks"]["api"] = {"status": "error", "error": str(e)}
        result["status"] = "down"
        return result

    # Check 2: Data server connectivity
    resp = session.get(f"{base_url}/dataservers",
                       params={"selectedFields": "Items.Name;Items.IsConnected"})
    if resp.ok:
        servers = resp.json().get("Items", [])
        disconnected = [s["Name"] for s in servers if not s.get("IsConnected")]
        result["checks"]["data_servers"] = {
            "status": "ok" if not disconnected else "error",
            "total": len(servers),
            "disconnected": disconnected,
        }
        if disconnected:
            result["status"] = "degraded"

    # Check 3: Data freshness (optional watchlist)
    if watchlist_web_ids:
        stale = []
        threshold = datetime.now(timezone.utc) - timedelta(
            minutes=stale_threshold_minutes
        )
        for name, wid in watchlist_web_ids:
            try:
                resp = session.get(
                    f"{base_url}/streams/{wid}/value",
                    params={"selectedFields": "Timestamp"},
                    timeout=5,
                )
                if resp.ok:
                    ts = datetime.fromisoformat(
                        resp.json()["Timestamp"].replace("Z", "+00:00")
                    )
                    if ts < threshold:
                        stale.append({"name": name, "last_update": ts.isoformat()})
            except Exception:
                stale.append({"name": name, "error": "read failed"})

        result["checks"]["data_freshness"] = {
            "status": "ok" if not stale else "warning",
            "stale_points": stale,
            "threshold_minutes": stale_threshold_minutes,
        }
        if stale:
            result["status"] = "warning" if result["status"] == "healthy" else result["status"]

    return result


# Usage
health = pi_health_check(session, BASE_URL, watchlist_web_ids=[
    ("Reactor_Temp", reactor_temp_wid),
    ("Feed_Flow", feed_flow_wid),
])
print(f"Overall status: {health['status']}")
for check, details in health["checks"].items():
    print(f"  {check}: {details['status']}")

Get the complete cookbook

Download all recipes

Get the complete PI Web API Cookbook with all recipes above, plus additional patterns for connection pooling, multi-server setups, and channel subscriptions.

Get the full cookbook