Skip to main content
Mail sorting facility bundling hundreds of requests into one efficient package
How-to Guide

Batch Requests

The PI Web API batch endpoint combines multiple API calls into a single HTTP request. This guide covers parallel and sequential execution, the RequestTemplate pattern for efficient multi-point operations, error handling for partial failures, performance tuning, and real-world bulk read/write patterns.

Why use batch requests?

Every HTTP request to PI Web API has overhead: TCP handshake, TLS negotiation, authentication, and network latency. When you need to read 200 PI points, making 200 individual requests takes 20-60 seconds over a WAN. The /batch endpoint sends all 200 sub-requests in a single HTTP call, and PI Web API executes them server-side.

Approach200 pointsNetwork callsAuth negotiations
Individual requests20-60 seconds200200 (or 1 with session reuse)
Batch request0.5-3 seconds11

Parallel vs sequential execution

PI Web API supports two execution modes within a batch. Understanding the difference is critical for correctness.

ModeHow to useWhen to use
Parallel (default)Omit ParentIds. All sub-requests run concurrently.Independent reads or writes where no sub-request depends on another.
Sequential (chained)Add ParentIds to create dependencies between sub-requests.Workflows where one result feeds into the next (e.g., look up element, then read its attributes).

Parallel is always faster

Sub-requests without ParentIds run in parallel on the server. Only add ParentIds when a sub-request genuinely depends on the output of another. Unnecessary chaining serializes execution and slows the entire batch.

Basic batch structure

A batch request is a POST to /batch with a JSON body. Each key is a unique name you assign, and each value describes one sub-request with Method, Resource, and optionally Content (for POST/PUT), Headers, and Parameters.

batch_structure.pypython
# Each sub-request specifies Method, Resource, and optionally Content
batch_request = {
    "GetTemp": {
        "Method": "GET",
        "Resource": f"{BASE_URL}/streams/{TEMP_WEB_ID}/value"
            + "?selectedFields=Timestamp;Value;Good",
    },
    "GetPressure": {
        "Method": "GET",
        "Resource": f"{BASE_URL}/streams/{PRESSURE_WEB_ID}/value"
            + "?selectedFields=Timestamp;Value;Good",
    },
    "GetFlow": {
        "Method": "GET",
        "Resource": f"{BASE_URL}/streams/{FLOW_WEB_ID}/value"
            + "?selectedFields=Timestamp;Value;Good",
    },
}

response = session.post(f"{BASE_URL}/batch", json=batch_request)
results = response.json()

# Each result has its own Status, Headers, and Content
for name, result in results.items():
    if result["Status"] == 200:
        content = result["Content"]
        print(f"{name}: {content['Value']} at {content['Timestamp']}")
    else:
        print(f"{name}: Error {result['Status']}")
        if "Content" in result:
            print(f"  Details: {result['Content']}")

Expected response structure:

batch_response.jsonjson
{
  "GetTemp": {
    "Status": 200,
    "Headers": { "Content-Type": "application/json" },
    "Content": {
      "Timestamp": "2025-03-15T14:30:00Z",
      "Value": 72.5,
      "Good": true
    }
  },
  "GetPressure": {
    "Status": 200,
    "Headers": { "Content-Type": "application/json" },
    "Content": {
      "Timestamp": "2025-03-15T14:30:00Z",
      "Value": 14.7,
      "Good": true
    }
  }
}

Bulk tag reads: current values

The most common batch pattern: reading the current snapshot of many PI points at once.

batch_read_current.pypython
def read_current_values_batch(
    session, base_url: str, web_ids: list[str]
) -> dict[str, dict]:
    """Read current values for multiple PI points in one HTTP call.

    Returns a dict keyed by WebID with value, timestamp, and quality.
    """
    batch = {}
    for web_id in web_ids:
        batch[web_id] = {
            "Method": "GET",
            "Resource": (
                f"{base_url}/streams/{web_id}/value"
                "?selectedFields=Timestamp;Value;Good"
            ),
        }

    response = session.post(f"{base_url}/batch", json=batch)
    response.raise_for_status()

    results = {}
    for web_id, result in response.json().items():
        if result["Status"] == 200:
            c = result["Content"]
            results[web_id] = {
                "value": c["Value"],
                "timestamp": c["Timestamp"],
                "good": c.get("Good", True),
            }
        else:
            results[web_id] = {
                "error": result["Status"],
                "message": result.get("Content", {}).get("Message", ""),
            }

    return results


# Usage — read 150 points in one call
web_ids = ["F1DPaH...", "F1DPbX...", "F1DPcZ..."]  # up to hundreds
values = read_current_values_batch(session, BASE_URL, web_ids)

for wid, val in values.items():
    if "error" in val:
        print(f"{wid}: ERROR {val['error']} - {val['message']}")
    else:
        print(f"{wid}: {val['value']} (good={val['good']})")

Bulk tag reads: recorded history

Batch historical queries for multiple points. Each sub-request can have its own time range and parameters.

batch_read_recorded.pypython
from datetime import datetime, timedelta, timezone

now = datetime.now(timezone.utc)
start = (now - timedelta(hours=24)).isoformat()
end = now.isoformat()

batch = {}
for web_id in web_ids:
    batch[web_id] = {
        "Method": "GET",
        "Resource": (
            f"{BASE_URL}/streams/{web_id}/recorded"
            f"?startTime={start}&endTime={end}"
            f"&maxCount=10000"
            f"&selectedFields=Items.Timestamp;Items.Value;Items.Good"
        ),
    }

response = session.post(f"{BASE_URL}/batch", json=batch)
results = response.json()

for web_id, result in results.items():
    if result["Status"] == 200:
        items = result["Content"]["Items"]
        print(f"{web_id}: {len(items)} values")
        # Check for truncation
        if len(items) == 10000:
            print(f"  WARNING: results may be truncated at maxCount")
    else:
        print(f"{web_id}: Error {result['Status']}")

Watch for maxCount truncation

If a sub-request returns exactly maxCount values, the data is likely truncated. Either increase maxCount, narrow the time range, or paginate each sub-request individually. Batch does not paginate individual sub-requests for you.

Multi-point writes

Write current values to multiple PI points atomically within one HTTP call. The Content field must be a JSON object, not a string.

batch_write.pypython
from datetime import datetime, timezone

now = datetime.now(timezone.utc).isoformat()

points_to_write = [
    {"web_id": "F1DPaH...", "value": 72.5,  "name": "temperature"},
    {"web_id": "F1DPbX...", "value": 14.7,  "name": "pressure"},
    {"web_id": "F1DPcZ...", "value": 150.0, "name": "flow_rate"},
]

batch = {}
for point in points_to_write:
    batch[point["name"]] = {
        "Method": "POST",
        "Resource": f"{BASE_URL}/streams/{point['web_id']}/value",
        # Content MUST be a JSON object, not a string
        "Content": {
            "Value": point["value"],
            "Timestamp": now,
        },
    }

response = session.post(f"{BASE_URL}/batch", json=batch)

# Check each sub-request individually
succeeded = 0
failed = 0
for name, result in response.json().items():
    status = result["Status"]
    if status in (200, 202, 204):
        succeeded += 1
    else:
        failed += 1
        msg = result.get("Content", {})
        if isinstance(msg, dict):
            msg = msg.get("Message", str(msg))
        print(f"FAILED {name}: HTTP {status} - {msg}")

print(f"Writes: {succeeded} succeeded, {failed} failed")

Content must be a JSON object

A common mistake is setting Content to a JSON string (e.g., using json.dumps() or an f-string). PI Web API expects the Content field to be an actual JSON object in the batch body, not a serialized string. If you pass a string, you will get 400 Bad Request on that sub-request.

Bulk historical writes

Write multiple historical values to multiple points in one batch call. Each sub-request writes an array of timestamped values to one point.

batch_write_historical.pypython
from datetime import datetime, timedelta, timezone

# Generate sample historical data for two points
base_time = datetime(2025, 3, 15, 0, 0, 0, tzinfo=timezone.utc)

batch = {
    "WriteTemp": {
        "Method": "POST",
        "Resource": f"{BASE_URL}/streams/{TEMP_WEB_ID}/recorded",
        "Content": [
            {
                "Timestamp": (base_time + timedelta(minutes=i * 5)).isoformat(),
                "Value": 70.0 + (i * 0.1),
            }
            for i in range(288)  # 24 hours at 5-minute intervals
        ],
    },
    "WritePressure": {
        "Method": "POST",
        "Resource": f"{BASE_URL}/streams/{PRESSURE_WEB_ID}/recorded",
        "Content": [
            {
                "Timestamp": (base_time + timedelta(minutes=i * 5)).isoformat(),
                "Value": 14.5 + (i * 0.005),
            }
            for i in range(288)
        ],
    },
}

response = session.post(f"{BASE_URL}/batch", json=batch)

for name, result in response.json().items():
    if result["Status"] in (200, 202, 204):
        print(f"{name}: OK")
    else:
        # Check for per-item errors in the response
        content = result.get("Content", {})
        if isinstance(content, dict) and "Errors" in content:
            print(f"{name}: {len(content['Errors'])} item errors")
            for err in content["Errors"][:3]:
                print(f"  {err}")
        else:
            print(f"{name}: Error {result['Status']} - {content}")

Dependent requests with ParentIds

Some batch workflows require chaining: first look up a resource, then use its WebID in a follow-up request. Use ParentIds and Parameters with JSONPath expressions to create dependencies.

Simple chain: look up point, then read value

batch_dependent_simple.pypython
batch = {
    "FindPoint": {
        "Method": "GET",
        "Resource": (
            f"{BASE_URL}/points"
            f"?path=\\\\YOUR-SERVER\\sinusoid"
        ),
    },
    "ReadValue": {
        "Method": "GET",
        "Resource": "{0}?selectedFields=Timestamp;Value;Good",
        "ParentIds": ["FindPoint"],
        "Parameters": ["$.FindPoint.Content.Links.Value"],
    },
}

response = session.post(f"{BASE_URL}/batch", json=batch)
results = response.json()

if results["ReadValue"]["Status"] == 200:
    content = results["ReadValue"]["Content"]
    print(f"Value: {content['Value']} at {content['Timestamp']}")
else:
    print(f"Error: {results['ReadValue']['Status']}")

Realistic chain: element, then attributes, then values

batch_dependent_chain.pypython
# Three-step chain: find AF element -> get its attributes -> read values
batch = {
    # Step 1: Look up an AF element by path
    "FindElement": {
        "Method": "GET",
        "Resource": (
            f"{BASE_URL}/elements"
            f"?path=\\\\YOUR-AF-SERVER\\Database\\Plant1\\Reactor01"
        ),
    },
    # Step 2: Get the element's attributes (depends on Step 1)
    "GetAttributes": {
        "Method": "GET",
        "Resource": "{0}?selectedFields=Items.Name;Items.WebId;Items.Links",
        "ParentIds": ["FindElement"],
        "Parameters": ["$.FindElement.Content.Links.Attributes"],
    },
}

response = session.post(f"{BASE_URL}/batch", json=batch)
results = response.json()

# Parse the attribute list
if results["GetAttributes"]["Status"] == 200:
    attributes = results["GetAttributes"]["Content"]["Items"]
    for attr in attributes:
        print(f"  {attr['Name']}: {attr['WebId']}")

    # Now batch-read all attribute values in a second batch call
    value_batch = {}
    for attr in attributes:
        value_batch[attr["Name"]] = {
            "Method": "GET",
            "Resource": (
                f"{BASE_URL}/streams/{attr['WebId']}/value"
                "?selectedFields=Timestamp;Value;Good"
            ),
        }

    value_response = session.post(f"{BASE_URL}/batch", json=value_batch)
    for attr_name, result in value_response.json().items():
        if result["Status"] == 200:
            print(f"  {attr_name} = {result['Content']['Value']}")
else:
    print(f"Error: {results['GetAttributes']['Status']}")

How ParentIds and Parameters work

When a sub-request has ParentIds, it waits for those parents to complete first. The Parameters array uses JSONPath expressions (prefixed with $.) to extract values from parent responses. These are injected into the child request's Resource URL using positional placeholders: {0}, {1}, etc. If a parent fails, the dependent sub-request is skipped and returns a 409 Conflict status.

RequestTemplate for efficient multi-point reads

When you need to make the same type of request for many resources (e.g., reading the current value of 500 points), the RequestTemplate and ParameterPaths pattern is more compact than writing 500 separate sub-request objects.

batch_request_template.pypython
# RequestTemplate: define the pattern once, parameterize per-item
# This reads current values for multiple points efficiently

# First, get the list of WebIDs (from a prior lookup or cache)
web_ids = ["F1DPaH...", "F1DPbX...", "F1DPcZ...", "F1DPdQ..."]

batch = {}
for i, web_id in enumerate(web_ids):
    batch[f"Read_{i}"] = {
        "Method": "GET",
        "Resource": (
            f"{BASE_URL}/streams/{web_id}/value"
            "?selectedFields=Timestamp;Value;Good"
        ),
    }

# For very large lists, you can also use the streamsets endpoint
# which is purpose-built for multi-point reads:
#
# GET /streamsets/{webId}/value  (for AF element children)
# POST /streamsets/value?selectedFields=Items.Name;Items.Items
#   with body: [webId1, webId2, ...]
#
# The streamsets endpoint is often faster than batch for pure
# multi-point reads because it is a single atomic operation.

response = session.post(f"{BASE_URL}/batch", json=batch)
print(f"Read {len(response.json())} point values in one call")

Consider streamsets for pure multi-point reads

If you are reading the same type of value (current, recorded, interpolated) for many points, the /streamsets endpoints are often faster than batch. The POST /streamsets/value endpoint accepts an array of WebIDs and returns all values in one call, without the overhead of named sub-requests. Use batch when you need mixed operations (reads + writes + lookups) in one call.

Batch size limits and server configuration

PI Web API limits batch requests at several levels. These defaults are configurable by the PI administrator in the PI Web API Admin utility.

SettingLocationDefaultNotes
BatchLimitSystem ConfigurationVaries (often 500)Maximum sub-requests per batch call
Request body sizeIIS / PI Web API config4 MB - 28 MBTotal JSON payload size limit
Request timeoutIIS configuration110 secondsLarge batches may exceed this
MaxItemsPerCallSystem ConfigurationVariesLimits items returned per individual sub-request

Finding your server's limits

If you exceed the batch limit, PI Web API returns 413 Request Entity Too Large or 400 Bad Request with a message indicating the limit. Ask your PI administrator to check PI Web API Admin > System Configuration for the BatchLimit setting. On most production servers, the default is sufficient for 200-500 sub-requests per batch.

Chunked batch for large operations

When you need to process more items than the server allows in a single batch, split into chunks. Optimal chunk size depends on your server and the complexity of each sub-request.

chunked_batch.pypython
import time
import logging


logger = logging.getLogger(__name__)


def chunked_batch(
    session,
    base_url: str,
    batch_items: dict,
    chunk_size: int = 200,
    delay_between_chunks: float = 0.1,
) -> dict:
    """Execute a large batch in chunks, respecting server limits.

    Args:
        batch_items: Full batch dict (may exceed server limit)
        chunk_size: Sub-requests per chunk (tune based on server config)
        delay_between_chunks: Seconds to wait between chunks (avoids
            overwhelming the server)

    Returns:
        Combined results from all chunks
    """
    keys = list(batch_items.keys())
    all_results = {}
    total_chunks = (len(keys) + chunk_size - 1) // chunk_size

    for chunk_num, i in enumerate(range(0, len(keys), chunk_size)):
        chunk_keys = keys[i : i + chunk_size]
        chunk = {k: batch_items[k] for k in chunk_keys}

        logger.info(
            f"Batch chunk {chunk_num + 1}/{total_chunks}: "
            f"{len(chunk)} sub-requests"
        )

        response = session.post(f"{base_url}/batch", json=chunk)
        response.raise_for_status()
        all_results.update(response.json())

        # Small delay to avoid overwhelming the server
        if i + chunk_size < len(keys) and delay_between_chunks > 0:
            time.sleep(delay_between_chunks)

    return all_results


# Example: read current values for 2000 points
big_batch = {}
for i, web_id in enumerate(large_web_id_list):
    big_batch[f"read_{i}"] = {
        "Method": "GET",
        "Resource": (
            f"{BASE_URL}/streams/{web_id}/value"
            "?selectedFields=Timestamp;Value;Good"
        ),
    }

# Process in chunks of 200 (safe default for most servers)
results = chunked_batch(session, BASE_URL, big_batch, chunk_size=200)
print(f"Read {len(results)} points across {len(big_batch)//200 + 1} chunks")

Tuning chunk size

Start with a chunk size of 200. If your server is fast and returns quickly, increase to 400-500. If you see timeouts or 503 errors, decrease to 100. For sub-requests that return large payloads (historical data), use a smaller chunk size (50-100) because the response body can be very large. For simple current-value reads, you can often go up to 500 per chunk.

Error handling for partial batch failures

A batch response can be a mix of successes and failures. The top-level HTTP status is typically 207 Multi-Status when there are mixed results, but you should always check each sub-request individually.

batch_error_handling.pypython
from dataclasses import dataclass


@dataclass
class BatchResult:
    """Parsed result of a batch sub-request."""
    name: str
    status: int
    content: dict | list | None
    success: bool

    @property
    def error_message(self) -> str:
        if self.success:
            return ""
        if isinstance(self.content, dict):
            return self.content.get("Message", str(self.content))
        return str(self.content) if self.content else f"HTTP {self.status}"


def execute_batch(session, base_url: str, batch: dict) -> list[BatchResult]:
    """Execute a batch request and return parsed results.

    Raises on transport-level errors (no response at all).
    Does NOT raise on individual sub-request failures.
    """
    response = session.post(f"{base_url}/batch", json=batch)

    # If the batch endpoint itself fails (not individual sub-requests)
    if response.status_code not in (200, 207, 202):
        response.raise_for_status()

    results = []
    for name, result in response.json().items():
        status = result["Status"]
        results.append(BatchResult(
            name=name,
            status=status,
            content=result.get("Content"),
            success=status in (200, 202, 204),
        ))

    return results


# Usage
results = execute_batch(session, BASE_URL, my_batch)

succeeded = [r for r in results if r.success]
failed = [r for r in results if not r.success]

print(f"Succeeded: {len(succeeded)}")
print(f"Failed: {len(failed)}")

for r in failed:
    print(f"  {r.name}: HTTP {r.status} - {r.error_message}")

# Common failure patterns:
# 404 - Point or element does not exist (wrong WebID or deleted)
# 403 - No permission on this specific point
# 409 - Dependent request failed (parent had an error)
# 502 - PI Data Archive is unreachable from PI Web API server

A 200 batch response does not mean all sub-requests succeeded

PI Web API returns HTTP 200 at the top level even when individual sub-requests have failed. You must iterate through every result and check each Status field. A common bug is to only check response.status_code and assume all is well.

Retry failed sub-requests

When some sub-requests in a batch fail (e.g., 503 from a temporarily overloaded server), you can retry just the failed ones without re-executing the successful ones.

batch_retry.pypython
import time


def batch_with_retry(
    session, base_url: str, batch: dict,
    max_retries: int = 3, retry_statuses: set = {502, 503},
) -> dict:
    """Execute batch with automatic retry for transient failures.

    Only retries sub-requests that failed with retryable status codes.
    Sub-requests that succeeded or failed permanently are not retried.
    """
    remaining = dict(batch)
    all_results = {}

    for attempt in range(max_retries + 1):
        if not remaining:
            break

        response = session.post(f"{base_url}/batch", json=remaining)
        response.raise_for_status()

        retry_batch = {}
        for name, result in response.json().items():
            status = result["Status"]
            if status in retry_statuses and attempt < max_retries:
                # This sub-request can be retried
                retry_batch[name] = remaining[name]
            else:
                # Final result (success or permanent failure)
                all_results[name] = result

        remaining = retry_batch
        if remaining:
            wait = 2 ** attempt
            print(f"Retrying {len(remaining)} sub-requests in {wait}s...")
            time.sleep(wait)

    return all_results

Real-world pattern: dashboard snapshot

A common production pattern: reading current values, 24-hour summaries, and alarm status for a set of points in a single batch call to populate a dashboard.

batch_dashboard.pypython
from datetime import datetime, timedelta, timezone

now = datetime.now(timezone.utc)
yesterday = (now - timedelta(hours=24)).isoformat()

# Define the dashboard points
dashboard_points = {
    "reactor_temp": REACTOR_TEMP_WEB_ID,
    "reactor_pressure": REACTOR_PRESSURE_WEB_ID,
    "feed_flow": FEED_FLOW_WEB_ID,
    "product_flow": PRODUCT_FLOW_WEB_ID,
}

# Build a batch with BOTH current values AND 24h summaries
batch = {}
for name, web_id in dashboard_points.items():
    # Current value
    batch[f"{name}_current"] = {
        "Method": "GET",
        "Resource": (
            f"{BASE_URL}/streams/{web_id}/value"
            "?selectedFields=Timestamp;Value;Good"
        ),
    }
    # 24-hour summary (min, max, average)
    batch[f"{name}_summary"] = {
        "Method": "GET",
        "Resource": (
            f"{BASE_URL}/streams/{web_id}/summary"
            f"?startTime={yesterday}&endTime={now.isoformat()}"
            f"&summaryType=Minimum&summaryType=Maximum&summaryType=Average"
            f"&selectedFields=Items.Type;Items.Value.Value"
        ),
    }

# One HTTP call for 4 current values + 4 summaries = 8 sub-requests
response = session.post(f"{BASE_URL}/batch", json=batch)
results = response.json()

# Parse into a dashboard-friendly structure
for name in dashboard_points:
    current = results.get(f"{name}_current", {})
    summary = results.get(f"{name}_summary", {})

    if current.get("Status") == 200:
        val = current["Content"]["Value"]
        print(f"{name}: current = {val}")

    if summary.get("Status") == 200:
        for item in summary["Content"].get("Items", []):
            stype = item["Type"]
            sval = item["Value"]["Value"]
            print(f"  {stype}: {sval:.2f}")

Performance guidelines

GuidelineDetails
Use selectedFields in every sub-requestReduces response size by 50-80%. Especially important in batch because savings multiply across all sub-requests.
Keep chunks under 200 for historical dataSub-requests with large responses (recorded values) consume more server memory. Smaller chunks prevent timeouts.
Use /streamsets for same-type readsIf all sub-requests read the same endpoint type (e.g., current value), the streamsets endpoint is faster than batch.
Avoid unnecessary ParentIdsParallel execution is always faster. Only chain requests when genuinely needed.
Cache WebIDsDo not include WebID lookups in every batch. Look up once, cache, and reuse.
Monitor response timeIf batch calls take more than 10 seconds, reduce chunk size or narrow time ranges for historical queries.

Need help?