Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for max_results argument #8

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,14 @@ dagent exec list_tags --conn_name=pi
dagent exec read_tag_values_period --conn_name=pi --tags="['sinusoid', 'sinusoidu']" --first_timestamp=*-100h --last_timestamp=*
dagent exec copy_period --src_conn=pi --tags="['SINUSOID', 'sinusoidu']" --dest_conn=csv --dest_group='sinus.csv' --first_timestamp=*-100h --last_timestamp=*
```


## Troubleshooting

### OSIsoft.AF.PI.PITimeoutException when reading historical data

Increase the SDK data access timeout settings on the client machine. There are two timeouts for the SDK, a connection timeout and a data access timeout. The connection timeout default is 10 seconds. The data access timeout is 60 seconds. Data access timeouts are the most likely cause of the error.
* Launch AboutPI-SDK.exe.
* Navigate to the Connections tab.
* Select the PI Data Archive in question.
* Increase the Data Access Timeout and Connection Timeout to 120 seconds or more.
45 changes: 29 additions & 16 deletions src/data_agent_osisoft_pi/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ def timestamp_to_datetime(timestamp):

MAP_TIME_FREQUENCY_TO_PI = {"raw data": None}

DEFAULT_PAGE_SIZE = 200000


def cast2python(val):
if str(type(val)) == "<class 'System.DateTime'>":
Expand Down Expand Up @@ -121,13 +119,17 @@ class OsisoftPiConnector(AbstractConnector):
("compressing", {"Type": "int", "Name": "Compression"}),
("changer", {"Type": "str", "Name": "Modified By"}),
]
DEFAULT_PAGE_SIZE = 200000
ABSOLUTE_MAX_VALUES_TO_READ = 1000000000

def __init__(self, conn_name="pi_client", server_name="default", **kwargs):
super(OsisoftPiConnector, self).__init__(conn_name)
self._server = None
self._server_name = server_name
self._page_size = (
int(kwargs["page_size"]) if "page_size" in kwargs else DEFAULT_PAGE_SIZE
int(kwargs["page_size"])
if "page_size" in kwargs
else OsisoftPiConnector.DEFAULT_PAGE_SIZE
)

@staticmethod
Expand All @@ -146,7 +148,7 @@ def list_connection_fields():
"name": "Data Read Page Size",
"type": "list",
"values": ["200000", "20000", "10000", "5000"],
"default_value": DEFAULT_PAGE_SIZE,
"default_value": OsisoftPiConnector.DEFAULT_PAGE_SIZE,
"optional": False,
},
}
Expand Down Expand Up @@ -189,6 +191,7 @@ def connect(self):
)

try:
# https://docs.aveva.com/bundle/af-sdk/page/html/M_OSIsoft_AF_PI_PIServer_Connect.htm
self._server.Connect(force=True)

log.debug(f"Connected to {self._server_name}, page_size={self._page_size}")
Expand Down Expand Up @@ -313,6 +316,7 @@ def read_tag_values_period(
first_timestamp=None,
last_timestamp=None,
time_frequency=None,
max_results=None,
result_format="dataframe",
progress_callback=None,
):
Expand All @@ -325,6 +329,8 @@ def read_tag_values_period(
if isinstance(last_timestamp, datetime):
last_timestamp = last_timestamp.strftime("%Y/%m/%d %H:%M:%S")

total_values_to_read = max_results or self.ABSOLUTE_MAX_VALUES_TO_READ

assert result_format in ["dataframe", "series", "tuple"]

names = tags
Expand Down Expand Up @@ -364,20 +370,18 @@ def read_tag_values_period(
time_span = AFTimeSpan.Parse(freq)

next_start_time = start_time
next_end_time = (
time_span.Multiply(next_start_time, self._page_size)
if time_span.Multiply(next_start_time, self._page_size)
< time_range.EndTime
else time_range.EndTime
)

# print('starting')
# print(f'range: {time_range} next_start_time={next_start_time}, next_end_time={next_end_time}')

while next_start_time < time_range.EndTime:
while (
next_start_time < time_range.EndTime
and total_values_to_read > 0
):
values_to_read = min(self._page_size, total_values_to_read)

next_end_time = (
time_span.Multiply(next_start_time, self._page_size)
if time_span.Multiply(next_start_time, self._page_size)
time_span.Multiply(next_start_time, values_to_read - 1)
if time_span.Multiply(next_start_time, values_to_read - 1)
< time_range.EndTime
else time_range.EndTime
)
Expand All @@ -392,6 +396,8 @@ def read_tag_values_period(
if records.Count == 0:
break

total_values_to_read -= records.Count

formatted_data = {
timestamp_to_datetime(val.Timestamp.UtcTime): val.Value
for val in records
Expand Down Expand Up @@ -419,19 +425,26 @@ def read_tag_values_period(
# print('starting')
# print(f'range: {time_range} next_start_time={next_start_time}')

while next_start_time < time_range.EndTime:
while (
next_start_time < time_range.EndTime
and total_values_to_read > 0
):
page_time_range = AFTimeRange(
next_start_time, time_range.EndTime
)

values_to_read = min(self._page_size, total_values_to_read)

# https://docs.aveva.com/bundle/af-sdk/page/html/M_OSIsoft_AF_PI_PIPoint_RecordedValues.htm
records = pt.RecordedValues(
page_time_range, boundary, "", False, self._page_size
page_time_range, boundary, "", False, values_to_read
)

if records.Count == 0:
break

total_values_to_read -= records.Count

formatted_data = {
timestamp_to_datetime(val.Timestamp.UtcTime): val.Value
for val in records
Expand Down
47 changes: 32 additions & 15 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,18 @@ def test_read_tag_values_period_interpolated(target_conn):
time_frequency="1 minute",
)

assert len(df) == 6001
assert len(df) == 6000
assert list(df.columns) == ["SINUSOIDU"]

df = target_conn.read_tag_values_period(
["sinusoidu"],
first_timestamp="*-100h",
last_timestamp="*",
time_frequency="1 minute",
max_results=10,
)

assert len(df) == 10
assert list(df.columns) == ["SINUSOIDU"]

df = target_conn.read_tag_values_period(
Expand All @@ -96,23 +107,11 @@ def test_read_tag_values_period_interpolated(target_conn):
time_frequency="3 minutes",
)

assert len(df) == 2001
assert len(df) == 2000
assert list(df.columns) == ["SINUSOID", "SINUSOIDU"]


def test_read_tag_values_period(target_conn):
# target_conn.read_tag_values(["sinusoid", "sinusoidu", "cdt158", "cdm158"],
# first_timestamp='2022/09/02 00:00:05',
# last_timestamp='2022/09/10 00:00:10',
# )

# df = target_conn.read_tag_values_period(["sinusoid", "sinusoidu"],
# first_timestamp='*-100h',
# last_timestamp='*',
# )
# assert 72 > len(df) > 10
# assert list(df.columns) == ['SINUSOID', 'SINUSOIDU']

def test_read_tag_values_period_recorded(target_conn):
df = target_conn.read_tag_values_period(
["sinusoid", "sinusoidu"],
# first_timestamp='*-100h',
Expand All @@ -129,6 +128,24 @@ def test_read_tag_values_period(target_conn):
)
assert list(df.columns) == ["SINUSOID", "SINUSOIDU"]

df = target_conn.read_tag_values_period(
["sinusoid", "sinusoidu"],
first_timestamp="2020-04-15 12:00:00",
last_timestamp="2020-05-16 12:00:00",
max_results=10,
)
assert list(df.columns) == ["SINUSOID", "SINUSOIDU"]
assert len(df) == 10

df = target_conn.read_tag_values_period(
["sinusoid", "sinusoidu"],
first_timestamp="2020-04-15 12:00:00",
last_timestamp="2020-12-16 12:00:00",
max_results=2000,
)
assert list(df.columns) == ["SINUSOID", "SINUSOIDU"]
assert len(df) == 2000


def test_read_tag_attributes(target_conn):
res = target_conn.read_tag_attributes(
Expand Down