Advanced Recipes
Production-grade patterns that go beyond the basics. Each recipe solves a real integration problem with complete, copy-paste-ready Python code.
Start with the basics first
These recipes assume you are comfortable with session setup, authentication, and basic reads/writes. If not, start with the PI Web API Cookbook first.
Recipe 1: Parallel reads with concurrent.futures
When you need to read hundreds of points, sequential requests are too slow. Use a thread pool to parallelize reads while respecting server limits. For even better performance, use the batch endpoint (Recipe 8 in the Cookbook) -- but parallel reads are useful when you need recorded/interpolated data per point.
from concurrent.futures import ThreadPoolExecutor, as_completed
import pandas as pd
def read_recorded_for_point(session, base_url, web_id, point_name,
start_time="*-24h", end_time="*"):
"""Read recorded values for a single point."""
resp = session.get(
f"{base_url}/streams/{web_id}/recorded",
params={
"startTime": start_time,
"endTime": end_time,
"maxCount": 10000,
"selectedFields": "Items.Timestamp;Items.Value;Items.Good",
},
)
resp.raise_for_status()
items = resp.json().get("Items", [])
df = pd.DataFrame(items)
if not df.empty:
df["Timestamp"] = pd.to_datetime(df["Timestamp"], utc=True)
df["PointName"] = point_name
return df
def read_many_recorded(session, base_url, points, max_workers=10,
start_time="*-24h", end_time="*"):
"""Read recorded values for many points in parallel.
Args:
points: list of (name, web_id) tuples
max_workers: Number of concurrent threads. Start with 10,
increase only if the server handles it well.
Monitor PI Web API CPU when increasing.
Returns: Combined DataFrame with PointName column
"""
results = []
errors = []
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {
pool.submit(
read_recorded_for_point, session, base_url,
wid, name, start_time, end_time
): name
for name, wid in points
}
for future in as_completed(futures):
name = futures[future]
try:
df = future.result()
if not df.empty:
results.append(df)
except Exception as e:
errors.append({"point": name, "error": str(e)})
if errors:
print(f"Warning: {len(errors)} reads failed:")
for err in errors:
print(f" {err['point']}: {err['error']}")
if results:
return pd.concat(results, ignore_index=True)
return pd.DataFrame()
# Usage
points = [
("Temperature_R1", "F1DPaH..."),
("Pressure_R1", "F1DPbX..."),
("Flow_R1", "F1DPcY..."),
# ... up to hundreds of points
]
df = read_many_recorded(session, BASE_URL, points, max_workers=10)
print(f"Read {len(df)} total values across {df['PointName'].nunique()} points")Respect server capacity
Start with 10 concurrent workers and monitor PI Web API server CPU and response times. Most PI Web API servers handle 10-20 concurrent connections well. Going higher risks 503 errors and can affect other users of the same PI Web API server.
Recipe 2: Resilient requests with retries and circuit breaker
Network glitches, server restarts, and transient 503 errors happen in production. This recipe adds retry logic for reads and a circuit breaker pattern that backs off when the server is struggling.
import time
import logging
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
logger = logging.getLogger("piwebapi")
def create_resilient_session(username, password, ca_bundle=False):
"""Create a session with automatic retries and connection pooling."""
import requests
session = requests.Session()
session.auth = (username, password)
session.verify = ca_bundle
retry_strategy = Retry(
total=3,
backoff_factor=1, # 1s, 2s, 4s between retries
status_forcelist=[408, 429, 500, 502, 503, 504],
allowed_methods=["GET"], # Only retry safe (read) methods
raise_on_status=False,
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10, # Connection pool size
pool_maxsize=10, # Max connections per host
)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
class CircuitBreaker:
"""Simple circuit breaker for PI Web API requests.
When errors exceed the threshold, the breaker opens and
requests are blocked for a cooldown period. This prevents
hammering a struggling server.
"""
def __init__(self, failure_threshold=5, cooldown_seconds=30):
self.failure_threshold = failure_threshold
self.cooldown_seconds = cooldown_seconds
self.failures = 0
self.last_failure_time = 0
self.state = "closed" # closed = healthy, open = blocking
def record_success(self):
self.failures = 0
self.state = "closed"
def record_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
logger.warning(
f"Circuit breaker OPEN after {self.failures} failures. "
f"Blocking requests for {self.cooldown_seconds}s."
)
def allow_request(self) -> bool:
if self.state == "closed":
return True
# Check if cooldown has elapsed
elapsed = time.time() - self.last_failure_time
if elapsed >= self.cooldown_seconds:
self.state = "half-open"
logger.info("Circuit breaker half-open. Allowing test request.")
return True
return False
def resilient_get(session, url, circuit_breaker, **kwargs):
"""Make a GET request with circuit breaker protection."""
if not circuit_breaker.allow_request():
logger.warning(f"Circuit breaker OPEN. Skipping request to {url}")
return None
try:
resp = session.get(url, **kwargs)
if resp.status_code < 500:
circuit_breaker.record_success()
else:
circuit_breaker.record_failure()
return resp
except Exception as e:
circuit_breaker.record_failure()
logger.error(f"Request failed: {e}")
return None
# Usage
session = create_resilient_session("DOMAIN\\user", "password")
breaker = CircuitBreaker(failure_threshold=5, cooldown_seconds=30)
resp = resilient_get(session, f"{BASE_URL}/streams/{WEB_ID}/value", breaker)
if resp and resp.ok:
print(f"Value: {resp.json()['Value']}")Why only retry GETs?
Retrying POST (write) requests can cause duplicate writes. If you need to retry writes, add idempotency logic: use updateOption=Replace so duplicate writes overwrite rather than conflict.
Recipe 3: Change detection with batch snapshots
Instead of polling every point individually, use the batch endpoint to take periodic snapshots and detect which values actually changed. Much more efficient than individual polling.
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class ChangeEvent:
point_name: str
web_id: str
old_value: object
new_value: object
old_timestamp: str
new_timestamp: str
class ChangeDetector:
"""Detect which PI points changed between polls using batch reads."""
def __init__(self, session, base_url, points):
"""
Args:
points: list of (name, web_id) tuples
"""
self.session = session
self.base_url = base_url
self.points = points
self.last_snapshot: dict = {}
def take_snapshot(self) -> list[ChangeEvent]:
"""Read current values via batch and return only changed points."""
# Build batch request
batch = {}
for name, wid in self.points:
batch[name] = {
"Method": "GET",
"Resource": (
f"{self.base_url}/streams/{wid}/value"
f"?selectedFields=Timestamp;Value;Good"
),
}
resp = self.session.post(f"{self.base_url}/batch", json=batch)
if not resp.ok:
return []
# Parse results and detect changes
changes = []
current = {}
for name, result in resp.json().items():
if result["Status"] != 200:
continue
content = result["Content"]
value = content["Value"]
if isinstance(value, dict):
value = value.get("Name", str(value))
current[name] = {
"value": value,
"timestamp": content["Timestamp"],
}
# Compare with previous snapshot
prev = self.last_snapshot.get(name)
if prev is None or prev["timestamp"] != content["Timestamp"]:
# Find web_id for this point
wid = next(w for n, w in self.points if n == name)
changes.append(ChangeEvent(
point_name=name,
web_id=wid,
old_value=prev["value"] if prev else None,
new_value=value,
old_timestamp=prev["timestamp"] if prev else None,
new_timestamp=content["Timestamp"],
))
self.last_snapshot = current
return changes
# Usage
points = [
("Reactor_Temp", temp_wid),
("Reactor_Pressure", pressure_wid),
("Feed_Flow", flow_wid),
]
detector = ChangeDetector(session, BASE_URL, points)
# First call: everything is "changed" (no previous snapshot)
changes = detector.take_snapshot()
print(f"Initial snapshot: {len(changes)} points")
# Subsequent calls: only actually changed points
import time
time.sleep(10)
changes = detector.take_snapshot()
print(f"Changed since last poll: {len(changes)} points")
for c in changes:
print(f" {c.point_name}: {c.old_value} -> {c.new_value}")Recipe 4: Historical data backfill
Backfilling large time ranges requires chunking to avoid timeouts and memory issues. This recipe reads recorded values in configurable chunks with progress reporting and truncation detection.
import pandas as pd
from datetime import datetime, timedelta, timezone
def backfill_recorded(
session, base_url, web_id, point_name,
start, end,
chunk_days=1, max_count=10000,
selected_fields="Items.Timestamp;Items.Value;Items.Good",
):
"""Read recorded values in chunks to avoid timeouts.
Args:
start: Start datetime (timezone-aware)
end: End datetime (timezone-aware)
chunk_days: Size of each time window in days
max_count: Max values per chunk request
Returns: Combined DataFrame
"""
all_frames = []
chunk_start = start
total_values = 0
truncated_chunks = 0
print(f"Backfilling {point_name}: {start.date()} to {end.date()}")
while chunk_start < end:
chunk_end = min(chunk_start + timedelta(days=chunk_days), end)
resp = session.get(
f"{base_url}/streams/{web_id}/recorded",
params={
"startTime": chunk_start.isoformat(),
"endTime": chunk_end.isoformat(),
"maxCount": max_count,
"boundaryType": "Inside",
"selectedFields": selected_fields,
},
)
resp.raise_for_status()
items = resp.json().get("Items", [])
if items:
df = pd.DataFrame(items)
df["Timestamp"] = pd.to_datetime(df["Timestamp"], utc=True)
all_frames.append(df)
total_values += len(items)
# Detect truncation
if len(items) == max_count:
truncated_chunks += 1
print(f" {chunk_start.date()}: {len(items)} values (TRUNCATED - reduce chunk_days)")
else:
print(f" {chunk_start.date()}: {len(items)} values")
chunk_start = chunk_end
if truncated_chunks > 0:
print(f"\nWARNING: {truncated_chunks} chunk(s) were truncated. "
f"Reduce chunk_days or increase max_count to get all data.")
if not all_frames:
print("No data found in the specified range.")
return pd.DataFrame()
result = pd.concat(all_frames, ignore_index=True)
result = result.drop_duplicates(subset=["Timestamp"])
result = result.sort_values("Timestamp").reset_index(drop=True)
print(f"\nTotal: {len(result)} unique recorded values")
return result
# Usage: backfill 90 days of data
df = backfill_recorded(
session, BASE_URL, WEB_ID, "Temperature_R1",
start=datetime(2026, 1, 1, tzinfo=timezone.utc),
end=datetime(2026, 3, 31, tzinfo=timezone.utc),
chunk_days=1,
max_count=50000,
)
# Save to CSV
if not df.empty:
df.to_csv("temperature_backfill.csv", index=False)
print(f"Saved to temperature_backfill.csv")Recipe 5: Multi-server aggregation
Some organizations run multiple PI Data Archives across plants or regions. This recipe reads the same metric from multiple servers in parallel and combines the results for cross-plant comparison.
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
SERVERS = [
{
"name": "Plant-A",
"url": "https://pi-a/piwebapi",
"point_path": "\\\\PI-A\\Flow_Rate",
},
{
"name": "Plant-B",
"url": "https://pi-b/piwebapi",
"point_path": "\\\\PI-B\\Flow_Rate",
},
{
"name": "Plant-C",
"url": "https://pi-c/piwebapi",
"point_path": "\\\\PI-C\\Flow_Rate",
},
]
def read_from_server(session, server, start_time, end_time):
"""Read recorded values from one server."""
# Look up the point
resp = session.get(
f"{server['url']}/points",
params={
"path": server["point_path"],
"selectedFields": "WebId",
},
)
resp.raise_for_status()
web_id = resp.json()["WebId"]
# Read values
resp = session.get(
f"{server['url']}/streams/{web_id}/recorded",
params={
"startTime": start_time,
"endTime": end_time,
"maxCount": 10000,
"selectedFields": "Items.Timestamp;Items.Value;Items.Good",
},
)
resp.raise_for_status()
items = resp.json().get("Items", [])
if not items:
return pd.DataFrame()
df = pd.DataFrame(items)
df["Timestamp"] = pd.to_datetime(df["Timestamp"], utc=True)
df["Server"] = server["name"]
return df
def aggregate_servers(session, servers, start_time, end_time):
"""Read from all servers in parallel and combine."""
frames = []
errors = []
with ThreadPoolExecutor(max_workers=len(servers)) as pool:
futures = {
pool.submit(read_from_server, session, s,
start_time, end_time): s["name"]
for s in servers
}
for future in futures:
name = futures[future]
try:
df = future.result()
if not df.empty:
frames.append(df)
print(f" {name}: {len(df)} values")
else:
print(f" {name}: no data")
except Exception as e:
errors.append(name)
print(f" {name}: ERROR - {e}")
if not frames:
return pd.DataFrame()
combined = pd.concat(frames, ignore_index=True)
return combined
# Usage
print("Reading from all plants...")
df = aggregate_servers(session, SERVERS, "*-24h", "*")
if not df.empty:
# Cross-plant summary
summary = df.groupby("Server")["Value"].agg(["mean", "min", "max", "count"])
print(f"\nCross-plant summary:")
print(summary)Recipe 6: Data quality validation
Validate PI data quality before downstream processing. Detect stale data, bad quality percentages, outliers, and gaps. Essential for production data pipelines.
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
import pandas as pd
@dataclass
class QualityReport:
point_name: str
total_values: int
good_count: int
bad_count: int
good_percent: float
latest_timestamp: str
is_stale: bool
stale_minutes: float
value_min: float | None
value_max: float | None
value_mean: float | None
has_outliers: bool
outlier_count: int
gap_count: int
max_gap_minutes: float
def validate_data_quality(
session, base_url, web_id, point_name,
start_time="*-24h", end_time="*",
stale_threshold_minutes=30,
outlier_std_threshold=4.0,
expected_interval_minutes=5,
max_gap_multiplier=3,
):
"""Validate data quality for a PI point.
Args:
stale_threshold_minutes: Flag if latest value is older than this
outlier_std_threshold: Flag values more than N std devs from mean
expected_interval_minutes: Expected time between events
max_gap_multiplier: Flag gaps > this * expected_interval
Returns: QualityReport
"""
# Get summary statistics
resp = session.get(
f"{base_url}/streams/{web_id}/summary",
params={
"startTime": start_time,
"endTime": end_time,
"summaryType": "Average,Minimum,Maximum,Count,PercentGood",
},
)
resp.raise_for_status()
stats = {}
for item in resp.json().get("Items", []):
stats[item["Type"]] = item["Value"].get("Value")
# Get recorded values for gap and outlier analysis
resp = session.get(
f"{base_url}/streams/{web_id}/recorded",
params={
"startTime": start_time,
"endTime": end_time,
"maxCount": 50000,
"selectedFields": "Items.Timestamp;Items.Value;Items.Good",
},
)
resp.raise_for_status()
items = resp.json().get("Items", [])
if not items:
return QualityReport(
point_name=point_name, total_values=0,
good_count=0, bad_count=0, good_percent=0.0,
latest_timestamp="N/A", is_stale=True, stale_minutes=999,
value_min=None, value_max=None, value_mean=None,
has_outliers=False, outlier_count=0,
gap_count=0, max_gap_minutes=0,
)
df = pd.DataFrame(items)
df["Timestamp"] = pd.to_datetime(df["Timestamp"], utc=True)
df = df.sort_values("Timestamp")
# Staleness check
latest_ts = df["Timestamp"].max()
now = datetime.now(timezone.utc)
stale_minutes = (now - latest_ts).total_seconds() / 60
# Quality counts
good_count = int(df["Good"].sum()) if "Good" in df.columns else len(df)
bad_count = len(df) - good_count
# Outlier detection (only on numeric, good-quality values)
good_numeric = df[df.get("Good", True) == True]["Value"].apply(
lambda v: v if isinstance(v, (int, float)) else None
).dropna()
outlier_count = 0
if len(good_numeric) > 10:
mean = good_numeric.mean()
std = good_numeric.std()
if std > 0:
outlier_count = int(
((good_numeric - mean).abs() > outlier_std_threshold * std).sum()
)
# Gap detection
timestamps = df["Timestamp"].sort_values()
gaps = timestamps.diff().dropna()
max_gap = timedelta(minutes=expected_interval_minutes * max_gap_multiplier)
large_gaps = gaps[gaps > max_gap]
max_gap_minutes = gaps.max().total_seconds() / 60 if len(gaps) > 0 else 0
return QualityReport(
point_name=point_name,
total_values=len(df),
good_count=good_count,
bad_count=bad_count,
good_percent=stats.get("PercentGood", (good_count / len(df) * 100)),
latest_timestamp=latest_ts.isoformat(),
is_stale=stale_minutes > stale_threshold_minutes,
stale_minutes=round(stale_minutes, 1),
value_min=stats.get("Minimum"),
value_max=stats.get("Maximum"),
value_mean=stats.get("Average"),
has_outliers=outlier_count > 0,
outlier_count=outlier_count,
gap_count=len(large_gaps),
max_gap_minutes=round(max_gap_minutes, 1),
)
# Usage
points = [
("Reactor_Temp", reactor_temp_wid),
("Feed_Pressure", feed_pressure_wid),
("Product_Flow", product_flow_wid),
]
print(f"{'Point':<20} {'Values':>8} {'Good%':>8} {'Stale':>6} {'Outliers':>9} {'Gaps':>5}")
print("-" * 62)
for name, wid in points:
report = validate_data_quality(session, BASE_URL, wid, name)
stale_flag = "YES" if report.is_stale else "no"
outlier_flag = str(report.outlier_count) if report.has_outliers else "0"
print(f"{name:<20} {report.total_values:>8} {report.good_percent:>7.1f}% "
f"{stale_flag:>6} {outlier_flag:>9} {report.gap_count:>5}")Recipe 7: Monitoring and alerting integration
Integrate PI data monitoring with your alerting stack. This recipe checks point values against thresholds and sends alerts via webhooks (compatible with Slack, Teams, PagerDuty, etc.).
import json
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
logger = logging.getLogger("pi_monitor")
@dataclass
class AlertRule:
point_name: str
web_id: str
high_limit: float | None = None
low_limit: float | None = None
stale_minutes: int = 30
description: str = ""
@dataclass
class Alert:
rule: AlertRule
alert_type: str # "high", "low", "stale", "bad_quality"
value: object
timestamp: str
message: str
def check_alerts(session, base_url, rules: list[AlertRule]) -> list[Alert]:
"""Check all alert rules against current PI values.
Uses batch endpoint for efficiency (one request for all points).
"""
# Batch read all points
batch = {}
for rule in rules:
batch[rule.point_name] = {
"Method": "GET",
"Resource": (
f"{base_url}/streams/{rule.web_id}/value"
f"?selectedFields=Timestamp;Value;Good"
),
}
resp = session.post(f"{base_url}/batch", json=batch)
if not resp.ok:
logger.error(f"Batch read failed: {resp.status_code}")
return []
alerts = []
now = datetime.now(timezone.utc)
for rule in rules:
result = resp.json().get(rule.point_name, {})
if result.get("Status") != 200:
alerts.append(Alert(
rule=rule, alert_type="read_error",
value=None, timestamp=now.isoformat(),
message=f"Cannot read {rule.point_name}: HTTP {result.get('Status')}",
))
continue
content = result["Content"]
value = content["Value"]
timestamp = content["Timestamp"]
good = content.get("Good", True)
# Check quality
if not good:
alerts.append(Alert(
rule=rule, alert_type="bad_quality",
value=value, timestamp=timestamp,
message=f"{rule.point_name} has bad quality data",
))
# Check staleness
try:
ts = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
age_minutes = (now - ts).total_seconds() / 60
if age_minutes > rule.stale_minutes:
alerts.append(Alert(
rule=rule, alert_type="stale",
value=value, timestamp=timestamp,
message=f"{rule.point_name} is stale ({age_minutes:.0f}m old)",
))
except Exception:
pass
# Check thresholds (only for numeric values)
if isinstance(value, (int, float)):
if rule.high_limit is not None and value > rule.high_limit:
alerts.append(Alert(
rule=rule, alert_type="high",
value=value, timestamp=timestamp,
message=f"{rule.point_name} = {value} exceeds high limit {rule.high_limit}",
))
if rule.low_limit is not None and value < rule.low_limit:
alerts.append(Alert(
rule=rule, alert_type="low",
value=value, timestamp=timestamp,
message=f"{rule.point_name} = {value} below low limit {rule.low_limit}",
))
return alerts
def send_webhook(url: str, alerts: list[Alert]):
"""Send alerts to a webhook (Slack, Teams, PagerDuty, etc.)."""
import requests as req
for alert in alerts:
payload = {
"text": f"PI Alert [{alert.alert_type.upper()}]: {alert.message}",
"point": alert.rule.point_name,
"value": str(alert.value),
"timestamp": alert.timestamp,
"type": alert.alert_type,
}
try:
req.post(url, json=payload, timeout=10)
except Exception as e:
logger.error(f"Webhook failed: {e}")
# Usage
rules = [
AlertRule("Reactor_Temp", reactor_wid, high_limit=350.0, low_limit=200.0,
stale_minutes=15, description="Reactor temperature"),
AlertRule("Feed_Pressure", pressure_wid, high_limit=150.0,
stale_minutes=10, description="Feed pressure"),
AlertRule("Product_Flow", flow_wid, low_limit=10.0,
stale_minutes=30, description="Product flow rate"),
]
alerts = check_alerts(session, BASE_URL, rules)
if alerts:
print(f"{len(alerts)} alert(s) triggered:")
for a in alerts:
print(f" [{a.alert_type}] {a.message}")
# send_webhook("https://hooks.slack.com/services/...", alerts)
else:
print("All points within normal limits.")Recipe 8: Scheduled snapshot to database
A production-grade pattern that reads current values on a schedule and inserts them into a SQL database. More robust than CSV files for operational data capture.
"""Scheduled PI snapshot to SQLite database.
For production, replace SQLite with PostgreSQL, MySQL, or your
preferred database. The pattern is the same.
"""
import sqlite3
from datetime import datetime, timezone
def init_database(db_path="pi_snapshots.db"):
"""Create the snapshots table if it doesn't exist."""
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
capture_time TEXT NOT NULL,
point_name TEXT NOT NULL,
value REAL,
value_text TEXT,
timestamp TEXT,
good BOOLEAN,
error TEXT
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_snapshots_point_time
ON snapshots (point_name, capture_time)
""")
conn.commit()
return conn
def capture_snapshot(session, base_url, points, conn):
"""Read current values and insert into database.
Args:
points: list of (name, web_id) tuples
conn: SQLite connection
"""
capture_time = datetime.now(timezone.utc).isoformat()
# Batch read all points
batch = {}
for name, wid in points:
batch[name] = {
"Method": "GET",
"Resource": (
f"{base_url}/streams/{wid}/value"
f"?selectedFields=Timestamp;Value;Good"
),
}
resp = session.post(f"{base_url}/batch", json=batch)
if not resp.ok:
print(f"Batch read failed: {resp.status_code}")
return 0
rows = []
for name, result in resp.json().items():
if result["Status"] == 200:
content = result["Content"]
value = content["Value"]
# Handle digital states and non-numeric values
numeric_value = None
text_value = None
if isinstance(value, (int, float)):
numeric_value = value
elif isinstance(value, dict):
text_value = value.get("Name", str(value))
else:
text_value = str(value)
rows.append((
capture_time, name, numeric_value, text_value,
content["Timestamp"], content.get("Good", True), None
))
else:
rows.append((
capture_time, name, None, None, None, False,
f"HTTP {result['Status']}"
))
conn.executemany(
"INSERT INTO snapshots "
"(capture_time, point_name, value, value_text, timestamp, good, error) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
rows,
)
conn.commit()
return len(rows)
# Usage (call from cron or Task Scheduler)
conn = init_database()
points = [
("Reactor_Temp", reactor_temp_wid),
("Feed_Pressure", feed_pressure_wid),
("Product_Flow", product_flow_wid),
]
count = capture_snapshot(session, BASE_URL, points, conn)
print(f"Captured {count} values at {datetime.now(timezone.utc).isoformat()}")
conn.close()For production scheduling
Use cron (Linux) or Task Scheduler (Windows) to run this script on a schedule. For orchestrated pipelines, use Apache Airflow, Prefect, or Dagster. Do not use time.sleep() loops in production -- they do not survive restarts and are hard to monitor.