
Writing Values
PI Web API lets you write single values, multiple historical values, and bulk updates to PI points. This guide covers each pattern in depth, including buffering behavior, update options, digital state writes, AF attributes, and production-grade error handling.
Write permissions required
Writing to PI requires explicit write permissions on the target points. Your PI Identity must have Data Access: Write permission on each point you want to write to. Contact your PI administrator if you get 403 errors. Always test writes against non-critical points first.
Write a single value
The simplest write: set the current value of a PI point.
POINT_WEB_ID = "your-web-id"
response = session.post(
f"{BASE_URL}/streams/{POINT_WEB_ID}/value",
json={
"Value": 42.5,
"Timestamp": "*", # Now
},
)
# Check the response
if response.status_code == 202:
print("Value accepted (queued for writing)")
elif response.status_code == 204:
print("Value written immediately")
else:
print(f"Error: {response.status_code}")
if response.text:
print(f"Details: {response.text[:500]}")Understanding 202 vs 204
| Status | Meaning |
|---|---|
202 Accepted | The value was accepted and queued for writing via the PI Buffer Subsystem. The write will complete asynchronously. This is the most common success response. |
204 No Content | The value was written directly to the PI Data Archive immediately. No buffering was used. |
Buffering options
PI Web API can buffer writes through the PI Buffer Subsystem, which provides reliability if the PI Data Archive is temporarily unavailable. The bufferOption parameter controls this behavior.
| bufferOption | Behavior | Use when |
|---|---|---|
BufferIfPossible | Writes through the buffer if available, direct if not. Default behavior. | Most scenarios (the safe default) |
Buffer | Always writes through the buffer. Fails if the buffer is not available. | When write durability is critical |
DoNotBuffer | Writes directly to the Data Archive, bypassing the buffer. Fails if the Data Archive is unreachable. | When you need to verify the write happened immediately |
# Write with explicit buffering control
response = session.post(
f"{BASE_URL}/streams/{POINT_WEB_ID}/value",
json={"Value": 42.5, "Timestamp": "*"},
params={"bufferOption": "BufferIfPossible"},
)Update options
When writing to a timestamp that already has a value, the updateOption parameter controls what happens.
| updateOption | Behavior | Use when |
|---|---|---|
Replace | Overwrites any existing value at that timestamp | Correcting bad data, backfill with overwrite |
Insert | Only writes if no value exists at that timestamp. Fails silently if one does. | Safe backfill that preserves existing data |
NoReplace | Same as Insert -- does not overwrite existing values | Alias for Insert |
InsertNoCompression | Writes without applying compression. Every value is stored. | When you need every value preserved regardless of compression settings |
Remove | Deletes the value at that timestamp | Removing erroneous data points |
# Replace an existing value
response = session.post(
f"{BASE_URL}/streams/{POINT_WEB_ID}/value",
json={"Value": 99.9, "Timestamp": "2026-03-15T10:00:00Z"},
params={"updateOption": "Replace"},
)
# Safe insert: only write if no value exists at this timestamp
response = session.post(
f"{BASE_URL}/streams/{POINT_WEB_ID}/value",
json={"Value": 50.0, "Timestamp": "2026-03-15T10:00:00Z"},
params={"updateOption": "Insert"},
)
# Delete a specific value
response = session.post(
f"{BASE_URL}/streams/{POINT_WEB_ID}/value",
json={"Value": 0, "Timestamp": "2026-03-15T10:00:00Z"},
params={"updateOption": "Remove"},
)Write multiple values to one point
Send an array of timestamped values to write historical data to a single PI point.
from datetime import datetime, timedelta, timezone
POINT_WEB_ID = "your-web-id"
# Generate sample values (e.g., backfilling 10 hours of data)
now = datetime.now(timezone.utc)
values = [
{
"Value": 20.0 + i * 0.5,
"Timestamp": (now - timedelta(hours=10-i)).isoformat(),
}
for i in range(10)
]
response = session.post(
f"{BASE_URL}/streams/{POINT_WEB_ID}/recorded",
json={"Items": values},
params={"updateOption": "Insert"}, # Don't overwrite existing values
)
if response.status_code in (202, 204):
print(f"Successfully wrote {len(values)} values")
else:
print(f"Error: {response.status_code}")
# Check for per-item errors in the response body
if response.text:
body = response.json()
if "Errors" in body:
for error in body["Errors"]:
print(f" Error: {error}")
if "Items" in body:
for i, item in enumerate(body["Items"]):
if item.get("Errors"):
print(f" Item {i} failed: {item['Errors']}")Write order matters
Send values sorted by timestamp in ascending order. While PI Data Archive can accept out-of-order writes, some compression behaviors work best when values arrive in chronological order. Out-of-order writes may also trigger unexpected compression results.
Write to multiple points (batch)
Use the batch endpoint to write values to multiple PI points in a single HTTP request. This example uses proper JSON objects for the Content field.
import json
# Define the points and values to write
points_to_write = {
"temperature": {"web_id": "F1DPaH...", "value": 72.5},
"pressure": {"web_id": "F1DPbX...", "value": 14.7},
"flow_rate": {"web_id": "F1DPcZ...", "value": 150.0},
}
# Build the batch request
batch_request = {}
for name, point in points_to_write.items():
batch_request[name] = {
"Method": "POST",
"Resource": f"{BASE_URL}/streams/{point['web_id']}/value",
"Content": json.dumps({
"Value": point["value"],
"Timestamp": "*",
}),
}
response = session.post(f"{BASE_URL}/batch", json=batch_request)
results = response.json()
# Check each sub-request result
for name, result in results.items():
status = result["Status"]
if status in (200, 202, 204):
print(f"{name}: OK ({status})")
else:
content = result.get("Content", "")
print(f"{name}: FAILED ({status}) - {content}")Write digital state values
Digital state points accept state names or integer codes. You can write using either the state name (more readable) or the numeric value.
# Write a digital state by name
response = session.post(
f"{BASE_URL}/streams/{DIGITAL_POINT_WEB_ID}/value",
json={
"Value": {"Name": "Active", "Value": 1},
"Timestamp": "*",
},
)
# Or write the integer code directly
# (the integer code depends on the point's digital state set)
response = session.post(
f"{BASE_URL}/streams/{DIGITAL_POINT_WEB_ID}/value",
json={
"Value": 1, # Integer code for the digital state
"Timestamp": "*",
},
)
# To find the valid digital states for a point,
# read the point's digital state set:
point_info = session.get(
f"{BASE_URL}/points/{DIGITAL_POINT_WEB_ID}"
).json()
print(f"Digital set: {point_info.get('DigitalSetName', 'N/A')}")Write to AF attributes
PI Web API can write to AF attributes, not just PI points. The endpoint is different but the value format is the same.
ATTRIBUTE_WEB_ID = "your-af-attribute-web-id"
# Write to an AF attribute (same value format as PI points)
response = session.post(
f"{BASE_URL}/streams/{ATTRIBUTE_WEB_ID}/value",
json={
"Value": 42.0,
"Timestamp": "*",
},
)
# Note: the attribute must be configured to allow writes.
# Attributes backed by PI point data references write to the underlying PI point.
# Static attributes store the value directly in the AF database.
if response.status_code in (202, 204):
print("Value written to AF attribute")
else:
print(f"Error: {response.status_code} - {response.text[:200]}")Backfill safety
Writing historical data (backfill) requires extra care to avoid corrupting existing data. Follow these guidelines:
- Use
updateOption=Insertto avoid overwriting existing values. This is the safest option for backfill. - Verify before writing. Read the target time range first to confirm it is empty or contains the values you expect.
- Write in chronological order. Send values sorted by timestamp ascending for best compression behavior.
- Chunk large writes. Do not send more than 10,000 values in a single request. Break large backfills into daily or hourly chunks.
- Log everything. Record the time range, point name, value count, and status for every write batch. This makes debugging much easier if something goes wrong.
def safe_backfill(session, base_url, web_id, values, chunk_size=5000):
"""Write historical values with safety checks and chunking.
Args:
values: List of {"Value": ..., "Timestamp": ...} dicts,
sorted by timestamp ascending.
chunk_size: Maximum values per write request.
Returns:
dict with success_count and error_count.
"""
success_count = 0
error_count = 0
for i in range(0, len(values), chunk_size):
chunk = values[i : i + chunk_size]
response = session.post(
f"{base_url}/streams/{web_id}/recorded",
json={"Items": chunk},
params={"updateOption": "Insert"}, # Don't overwrite
)
if response.status_code in (202, 204):
success_count += len(chunk)
print(f" Wrote {len(chunk)} values "
f"({chunk[0]['Timestamp']} to {chunk[-1]['Timestamp']})")
else:
error_count += len(chunk)
print(f" FAILED: {response.status_code} - {response.text[:200]}")
return {"success": success_count, "errors": error_count}
# Usage
result = safe_backfill(session, BASE_URL, POINT_WEB_ID, my_values)
print(f"\nBackfill complete: {result['success']} written, {result['errors']} errors")Idempotent write pattern
For reliable ETL pipelines that may retry on failure, use the Replace update option so that re-running the same write produces the same result without duplicates.
def idempotent_write(session, base_url, web_id, values):
"""Write values idempotently -- safe to retry on failure.
Uses Replace so re-running with the same data produces
the same result. No duplicates, no errors on retry.
"""
response = session.post(
f"{base_url}/streams/{web_id}/recorded",
json={"Items": values},
params={"updateOption": "Replace"},
)
if response.status_code in (202, 204):
return True
else:
raise RuntimeError(
f"Write failed: {response.status_code} - {response.text[:200]}"
)
# This is safe to call multiple times with the same data
idempotent_write(session, BASE_URL, POINT_WEB_ID, my_values)
# Calling again with the same values: no error, no duplicates
idempotent_write(session, BASE_URL, POINT_WEB_ID, my_values)Error handling
When writing multiple values, some may succeed while others fail. Always check the response body for per-item errors.
response = session.post(
f"{BASE_URL}/streams/{POINT_WEB_ID}/recorded",
json={"Items": values},
)
if response.status_code not in (202, 204):
body = response.json() if response.text else {}
# Check for overall errors
if "Errors" in body:
for error in body["Errors"]:
print(f"Error: {error}")
# Check for per-item errors
if "Items" in body:
for i, item in enumerate(body["Items"]):
if item.get("Errors"):
print(f"Item {i} ({values[i]['Timestamp']}): {item['Errors']}")Common write errors
| Error | Cause | Fix |
|---|---|---|
| 403 Forbidden | No write permission on the point | Ask PI admin to grant Data Access: Write for your PI Identity |
| 409 Conflict | Value exists at that timestamp | Use updateOption=Replace to overwrite |
| 400 Bad Request | Value type mismatch (e.g., string to float point) | Check the point type and send matching value type |
| 500 Internal Server Error | PI Data Archive unreachable or overloaded | Retry with backoff, check Data Archive connectivity |