Pagination & Limits
PI Web API limits response sizes to protect server performance. Understanding maxCount behavior, time-based pagination, continuation tokens, and memory-efficient streaming patterns is essential for retrieving large datasets reliably without missing data.
How PI Web API limits responses
PI Web API applies limits at multiple levels. These are configurable by the PI administrator.
| Setting | Default | Location | What it affects |
|---|---|---|---|
maxCount (query parameter) | 1,000 | Per-request | Maximum items returned for stream endpoints (/recorded, /interpolated, etc.) |
MaxItemsPerCall | Varies (often 150,000) | PI Web API Admin > System Configuration | Server-wide upper bound on maxCount. Your request cannot exceed this. |
BatchLimit | Varies (often 500) | PI Web API Admin > System Configuration | Maximum sub-requests per batch call |
| Search results | 10 (default count) | Per-request | Indexed Search results per page (use count parameter, max varies) |
| Request body size | 4 MB - 28 MB | IIS configuration | Maximum POST body size (affects batch and write payloads) |
maxCount behavior and defaults
The maxCount parameter controls how many values the server returns from stream endpoints. If the time range contains more values than maxCount, the response is silently truncated -- no error, no warning header.
# Default behavior: maxCount=1000 (if not specified)
response = session.get(
f"{BASE_URL}/streams/{WEB_ID}/recorded",
params={"startTime": "*-30d", "endTime": "*"},
)
items = response.json()["Items"]
# If the point has 50,000 values in 30 days, you only get 1,000!
# Explicitly set maxCount higher
response = session.get(
f"{BASE_URL}/streams/{WEB_ID}/recorded",
params={
"startTime": "*-30d",
"endTime": "*",
"maxCount": 50000,
"selectedFields": "Items.Timestamp;Items.Value;Items.Good",
},
)
items = response.json()["Items"]
print(f"Returned {len(items)} values")
# CRITICAL: detect truncation
if len(items) == 50000:
print("WARNING: Results may be truncated. Paginate to get all data.")Silent truncation is the number one data quality bug
PI Web API does not return a header or flag indicating truncation. If you request maxCount=1000 and get exactly 1,000 results, your data is almost certainly truncated. Always check len(items) == maxCount and paginate if true. Many production bugs trace back to silently truncated data that was never detected.
Estimating data volume before querying
Use the Count summary type to estimate how many recorded values exist in a time range before retrieving them.
def estimate_value_count(session, base_url, web_id, start, end):
"""Estimate the number of recorded values in a time range.
Uses the Count summary to get an approximate count without
transferring the actual data.
"""
resp = session.get(
f"{base_url}/streams/{web_id}/summary",
params={
"startTime": start,
"endTime": end,
"summaryType": "Count",
"selectedFields": "Items.Value.Value",
},
)
resp.raise_for_status()
items = resp.json().get("Items", [])
if items:
return int(items[0]["Value"]["Value"])
return 0
# Check before querying
count = estimate_value_count(session, BASE_URL, WEB_ID, "*-30d", "*")
print(f"Estimated values in 30 days: {count}")
if count > 100000:
print("Large dataset — use paginated retrieval")
# Use time-based pagination (see below)
elif count > 0:
print(f"Setting maxCount={count + 1000} to ensure no truncation")
# Safe to retrieve in one callTime-based pagination for recorded values
The most reliable way to retrieve all recorded values from a large time range: paginate by time, using the timestamp of the last returned value as the start of the next page.
from datetime import datetime, timedelta, timezone
def get_all_recorded_values(
session, base_url: str, web_id: str,
start: str, end: str,
max_count: int = 10000,
selected_fields: str = "Items.Timestamp;Items.Value;Items.Good",
) -> list[dict]:
"""Retrieve all recorded values using time-based pagination.
Handles the duplicate-timestamp edge case by using the timestamp
of the last value as the start of the next page, with boundary
type set to exclude the boundary value.
Args:
start: Start time (PI time string or ISO 8601)
end: End time
max_count: Values per page (higher = fewer HTTP calls, more memory)
Returns:
Complete list of all recorded values in the time range
"""
all_values = []
current_start = start
page = 0
while True:
page += 1
response = session.get(
f"{base_url}/streams/{web_id}/recorded",
params={
"startTime": current_start,
"endTime": end,
"maxCount": max_count,
"selectedFields": selected_fields,
},
)
response.raise_for_status()
items = response.json().get("Items", [])
if not items:
break
# On the first page, take all items.
# On subsequent pages, skip the first item (it's the same as
# the last item of the previous page due to inclusive start boundary).
if page > 1 and items[0]["Timestamp"] == all_values[-1]["Timestamp"]:
items = items[1:]
if not items:
break
all_values.extend(items)
# If fewer than maxCount, we have all the data
if len(items) < max_count:
break
# Use the last timestamp as the start of the next page
current_start = items[-1]["Timestamp"]
return all_values
# Usage: get all values for the last 90 days
values = get_all_recorded_values(
session, BASE_URL, WEB_ID,
start="*-90d",
end="*",
max_count=50000,
)
print(f"Total values retrieved: {len(values)}")The duplicate-timestamp edge case
When paginating by time, the last value of page N and the first value of page N+1 can have the same timestamp (PI can store multiple values at the exact same timestamp). The code above handles this by comparing timestamps at the page boundary. A simpler but less correct approach is to add 1 millisecond to the start time, but this risks missing values that genuinely share a timestamp.
Continuation tokens
Some PI Web API endpoints support continuation tokens for stateful pagination. When available, the response includes a Links.Next URL that contains a continuation token. This is more reliable than time-based pagination because the server tracks your position.
def get_all_with_continuation(session, initial_url: str) -> list[dict]:
"""Paginate using continuation tokens (Links.Next).
Works with endpoints that support continuation:
- /elements/{webId}/elements (child elements)
- /dataservers/{webId}/points (server points)
- /assetdatabases/{webId}/elements (database elements)
- /search/query (indexed search results)
The server includes a 'Links.Next' URL when more data is available.
"""
all_items = []
url = initial_url
while url:
response = session.get(url)
response.raise_for_status()
data = response.json()
items = data.get("Items", [])
all_items.extend(items)
# Check for continuation link
links = data.get("Links", {})
url = links.get("Next") # None if no more pages
if url:
print(f" Page loaded: {len(items)} items (total: {len(all_items)})")
return all_items
# Example: get all points on a data server (may be thousands)
server_web_id = "S1..."
all_points = get_all_with_continuation(
session,
f"{BASE_URL}/dataservers/{server_web_id}/points"
f"?maxCount=500&selectedFields=Items.WebId;Items.Name;Items.PointType;Links",
)
print(f"Total points on server: {len(all_points)}")
# Example: get all child elements (full AF tree level)
all_elements = get_all_with_continuation(
session,
f"{BASE_URL}/elements/{ROOT_WEB_ID}/elements"
f"?maxCount=200&selectedFields=Items.WebId;Items.Name;Items.HasChildren;Links",
)
print(f"Total child elements: {len(all_elements)}")When continuation tokens are available
Continuation tokens are available on collection endpoints (elements, points, attributes, databases) but not on stream endpoints (/recorded, /interpolated, etc.). For stream data, use time-based pagination. You can detect whether an endpoint supports continuation by checking for a Links.Next field in the response.
Search pagination
The PI Web API search endpoint uses offset-based pagination with start and count parameters.
def search_all(
session, base_url: str, query: str, page_size: int = 100
) -> list[dict]:
"""Search for PI resources with full pagination.
Args:
query: Search query (e.g., "name:*temperature*")
page_size: Results per page (max depends on server config)
Returns:
All matching resources across all pages
"""
all_results = []
start = 0
while True:
response = session.get(
f"{base_url}/search/query",
params={
"q": query,
"count": page_size,
"start": start,
"selectedFields": (
"Items.Name;Items.WebId;Items.ItemType;"
"Items.Description;TotalHits"
),
},
)
response.raise_for_status()
data = response.json()
items = data.get("Items", [])
if not items:
break
all_results.extend(items)
# Check if we've retrieved everything
total_hits = data.get("TotalHits", 0)
if total_hits > 0 and start + page_size >= total_hits:
break
# Also stop if we got fewer than requested
if len(items) < page_size:
break
start += page_size
print(f" Search page: {len(all_results)}/{total_hits}")
return all_results
# Usage
results = search_all(session, BASE_URL, "name:*temperature*")
print(f"Found {len(results)} items matching 'temperature'")Memory-efficient streaming for large datasets
When retrieving millions of values, loading everything into a list can exhaust memory. Use a generator pattern to process data in chunks without holding it all in memory.
from typing import Generator
import csv
from pathlib import Path
def stream_recorded_values(
session, base_url: str, web_id: str,
start: str, end: str,
page_size: int = 50000,
) -> Generator[list[dict], None, None]:
"""Yield pages of recorded values without loading all into memory.
Each yield returns one page of values. The caller processes and
discards each page before the next is loaded.
Yields:
Lists of value dicts, one list per page
"""
current_start = start
page = 0
while True:
page += 1
response = session.get(
f"{base_url}/streams/{web_id}/recorded",
params={
"startTime": current_start,
"endTime": end,
"maxCount": page_size,
"selectedFields": "Items.Timestamp;Items.Value;Items.Good",
},
)
response.raise_for_status()
items = response.json().get("Items", [])
if not items:
break
# Handle page boundary duplicates
if page > 1:
items = items[1:] # Skip duplicate first item
if not items:
break
yield items
if len(items) < page_size:
break
current_start = items[-1]["Timestamp"]
def export_to_csv(
session, base_url: str, web_id: str,
start: str, end: str,
output_file: str,
page_size: int = 50000,
):
"""Stream PI data directly to CSV without loading all into memory.
Memory usage stays constant regardless of dataset size.
"""
output = Path(output_file)
total_rows = 0
with open(output, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["Timestamp", "Value", "Good"])
for page in stream_recorded_values(
session, base_url, web_id, start, end, page_size
):
for item in page:
value = item["Value"]
# Handle digital states
if isinstance(value, dict):
value = value.get("Name", str(value))
writer.writerow([
item["Timestamp"],
value,
item.get("Good", True),
])
total_rows += 1
print(f" Written {total_rows} rows...")
print(f"Export complete: {total_rows} rows to {output_file}")
# Usage: export 1 year of data (potentially millions of values)
export_to_csv(
session, BASE_URL, WEB_ID,
start="*-365d", end="*",
output_file="reactor_temp_2024.csv",
page_size=50000,
)
# Memory usage: ~constant at ~50K values * ~100 bytes = ~5 MB per pageChoosing page_size
For memory-efficient streaming, page_size=50000 is a good balance. Each page is roughly 5-10 MB in memory. Smaller pages (10,000) mean more HTTP calls but less memory. Larger pages (100,000+) are faster but may time out on slow servers. If exporting to a database, match the page size to your database's optimal batch insert size.
Rate limiting and throttling
PI Web API does not have formal rate-limiting headers like public APIs (no X-RateLimit-Remaining). However, several mechanisms can throttle or reject your requests.
| Throttle source | Symptom | Mitigation |
|---|---|---|
| IIS connection limits | Connection refused or HTTP 503 | Limit concurrent connections to 10-20 per client. Use session reuse. |
| PI Data Archive query limits | HTTP 502 or slow responses | Reduce maxCount, narrow time ranges, add delays between pages. |
| PI Web API throttling config | HTTP 429 or 503 | Configured by admin. Back off and retry with exponential delay. |
| Request timeout (IIS) | HTTP 504 or connection reset | Use smaller page sizes. Complex queries (summaries over large ranges) are slower. |
| Kerberos ticket expiry | HTTP 401 after hours of running | Refresh Kerberos ticket in long-running ETL jobs. Check klist. |
import time
import logging
logger = logging.getLogger(__name__)
def paginate_with_backpressure(
session, base_url: str, web_id: str,
start: str, end: str,
max_count: int = 10000,
min_delay: float = 0.05,
max_delay: float = 5.0,
) -> list[dict]:
"""Paginate with adaptive rate control.
Starts with minimal delay and increases it when the server is slow.
This prevents overwhelming the server during large extractions
while staying fast when the server has capacity.
"""
all_values = []
current_start = start
delay = min_delay
page = 0
while True:
page += 1
start_time = time.monotonic()
response = session.get(
f"{base_url}/streams/{web_id}/recorded",
params={
"startTime": current_start,
"endTime": end,
"maxCount": max_count,
"selectedFields": "Items.Timestamp;Items.Value;Items.Good",
},
)
elapsed = time.monotonic() - start_time
# Handle server errors with backoff
if response.status_code in (502, 503, 504):
delay = min(delay * 2, max_delay)
logger.warning(
f"Server returned {response.status_code}, "
f"backing off to {delay:.1f}s"
)
time.sleep(delay)
continue
response.raise_for_status()
items = response.json().get("Items", [])
if not items:
break
if page > 1 and items[0]["Timestamp"] == all_values[-1]["Timestamp"]:
items = items[1:]
if not items:
break
all_values.extend(items)
# Adaptive delay: if the server responded quickly, reduce delay
if elapsed < 1.0:
delay = max(delay * 0.8, min_delay)
elif elapsed > 5.0:
delay = min(delay * 1.5, max_delay)
if len(items) < max_count:
break
current_start = items[-1]["Timestamp"]
logger.info(
f"Page {page}: {len(items)} values ({elapsed:.1f}s), "
f"total: {len(all_values)}, next delay: {delay:.2f}s"
)
time.sleep(delay)
return all_valuesBest practices for high-volume retrieval
Use batch requests to reduce connection count. Limit concurrent connections to 10-20 per server. Use selectedFields to minimize response size. Prefer interpolated values over recorded when you do not need every event. Add adaptive delays between paginated requests. Monitor PI Web API server CPU and memory if you control the infrastructure.
Server configuration reference
If you are a PI administrator or can work with one, these settings control pagination behavior. They are configured in PI Web API Admin (the management utility installed alongside PI Web API).
| Setting | Path in Admin | Notes |
|---|---|---|
| MaxItemsPerCall | System Configuration > API | Upper bound on maxCount for any single request. Increase for large extraction jobs, but monitor server memory. |
| BatchLimit | System Configuration > API | Maximum sub-requests per batch call. Default is typically sufficient. |
| Request Timeout | IIS Manager > Sites > PI Web API | Default 110 seconds. Increase if you see timeouts on large queries. |
| Max Connections | IIS Manager > Application Pools | Maximum concurrent connections. Default is usually fine for small teams. Increase for high-concurrency production workloads. |