From fd845e6a37018f9889c9b9b0a90bb0b204aa7b31 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Wed, 15 Nov 2023 15:13:50 -0600 Subject: [PATCH] add support for max_results argument --- README.md | 11 ++++++ src/data_agent_osisoft_pi/connector.py | 45 +++++++++++++++--------- tests/test_connector.py | 47 ++++++++++++++++++-------- 3 files changed, 72 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 4af56f7..4faeb6b 100644 --- a/README.md +++ b/README.md @@ -20,3 +20,14 @@ dagent exec list_tags --conn_name=pi dagent exec read_tag_values_period --conn_name=pi --tags="['sinusoid', 'sinusoidu']" --first_timestamp=*-100h --last_timestamp=* dagent exec copy_period --src_conn=pi --tags="['SINUSOID', 'sinusoidu']" --dest_conn=csv --dest_group='sinus.csv' --first_timestamp=*-100h --last_timestamp=* ``` + + +## Troubleshooting + +### OSIsoft.AF.PI.PITimeoutException when reading historical data + +Increase the SDK data access timeout settings on the client machine. There are two timeouts for the SDK, a connection timeout and a data access timeout. The connection timeout default is 10 seconds. The data access timeout is 60 seconds. Data access timeouts are the most likely cause of the error. +* Launch AboutPI-SDK.exe. +* Navigate to the Connections tab. +* Select the PI Data Archive in question. +* Increase the Data Access Timeout and Connection Timeout to 120 seconds or more. diff --git a/src/data_agent_osisoft_pi/connector.py b/src/data_agent_osisoft_pi/connector.py index 3a6f623..568c6b8 100644 --- a/src/data_agent_osisoft_pi/connector.py +++ b/src/data_agent_osisoft_pi/connector.py @@ -87,8 +87,6 @@ def timestamp_to_datetime(timestamp): MAP_TIME_FREQUENCY_TO_PI = {"raw data": None} -DEFAULT_PAGE_SIZE = 200000 - def cast2python(val): if str(type(val)) == "": @@ -121,13 +119,17 @@ class OsisoftPiConnector(AbstractConnector): ("compressing", {"Type": "int", "Name": "Compression"}), ("changer", {"Type": "str", "Name": "Modified By"}), ] + DEFAULT_PAGE_SIZE = 200000 + ABSOLUTE_MAX_VALUES_TO_READ = 1000000000 def __init__(self, conn_name="pi_client", server_name="default", **kwargs): super(OsisoftPiConnector, self).__init__(conn_name) self._server = None self._server_name = server_name self._page_size = ( - int(kwargs["page_size"]) if "page_size" in kwargs else DEFAULT_PAGE_SIZE + int(kwargs["page_size"]) + if "page_size" in kwargs + else OsisoftPiConnector.DEFAULT_PAGE_SIZE ) @staticmethod @@ -146,7 +148,7 @@ def list_connection_fields(): "name": "Data Read Page Size", "type": "list", "values": ["200000", "20000", "10000", "5000"], - "default_value": DEFAULT_PAGE_SIZE, + "default_value": OsisoftPiConnector.DEFAULT_PAGE_SIZE, "optional": False, }, } @@ -189,6 +191,7 @@ def connect(self): ) try: + # https://docs.aveva.com/bundle/af-sdk/page/html/M_OSIsoft_AF_PI_PIServer_Connect.htm self._server.Connect(force=True) log.debug(f"Connected to {self._server_name}, page_size={self._page_size}") @@ -313,6 +316,7 @@ def read_tag_values_period( first_timestamp=None, last_timestamp=None, time_frequency=None, + max_results=None, result_format="dataframe", progress_callback=None, ): @@ -325,6 +329,8 @@ def read_tag_values_period( if isinstance(last_timestamp, datetime): last_timestamp = last_timestamp.strftime("%Y/%m/%d %H:%M:%S") + total_values_to_read = max_results or self.ABSOLUTE_MAX_VALUES_TO_READ + assert result_format in ["dataframe", "series", "tuple"] names = tags @@ -364,20 +370,18 @@ def read_tag_values_period( time_span = AFTimeSpan.Parse(freq) next_start_time = start_time - next_end_time = ( - time_span.Multiply(next_start_time, self._page_size) - if time_span.Multiply(next_start_time, self._page_size) - < time_range.EndTime - else time_range.EndTime - ) # print('starting') - # print(f'range: {time_range} next_start_time={next_start_time}, next_end_time={next_end_time}') - while next_start_time < time_range.EndTime: + while ( + next_start_time < time_range.EndTime + and total_values_to_read > 0 + ): + values_to_read = min(self._page_size, total_values_to_read) + next_end_time = ( - time_span.Multiply(next_start_time, self._page_size) - if time_span.Multiply(next_start_time, self._page_size) + time_span.Multiply(next_start_time, values_to_read - 1) + if time_span.Multiply(next_start_time, values_to_read - 1) < time_range.EndTime else time_range.EndTime ) @@ -392,6 +396,8 @@ def read_tag_values_period( if records.Count == 0: break + total_values_to_read -= records.Count + formatted_data = { timestamp_to_datetime(val.Timestamp.UtcTime): val.Value for val in records @@ -419,19 +425,26 @@ def read_tag_values_period( # print('starting') # print(f'range: {time_range} next_start_time={next_start_time}') - while next_start_time < time_range.EndTime: + while ( + next_start_time < time_range.EndTime + and total_values_to_read > 0 + ): page_time_range = AFTimeRange( next_start_time, time_range.EndTime ) + values_to_read = min(self._page_size, total_values_to_read) + # https://docs.aveva.com/bundle/af-sdk/page/html/M_OSIsoft_AF_PI_PIPoint_RecordedValues.htm records = pt.RecordedValues( - page_time_range, boundary, "", False, self._page_size + page_time_range, boundary, "", False, values_to_read ) if records.Count == 0: break + total_values_to_read -= records.Count + formatted_data = { timestamp_to_datetime(val.Timestamp.UtcTime): val.Value for val in records diff --git a/tests/test_connector.py b/tests/test_connector.py index 80a46f9..851abeb 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -76,7 +76,18 @@ def test_read_tag_values_period_interpolated(target_conn): time_frequency="1 minute", ) - assert len(df) == 6001 + assert len(df) == 6000 + assert list(df.columns) == ["SINUSOIDU"] + + df = target_conn.read_tag_values_period( + ["sinusoidu"], + first_timestamp="*-100h", + last_timestamp="*", + time_frequency="1 minute", + max_results=10, + ) + + assert len(df) == 10 assert list(df.columns) == ["SINUSOIDU"] df = target_conn.read_tag_values_period( @@ -96,23 +107,11 @@ def test_read_tag_values_period_interpolated(target_conn): time_frequency="3 minutes", ) - assert len(df) == 2001 + assert len(df) == 2000 assert list(df.columns) == ["SINUSOID", "SINUSOIDU"] -def test_read_tag_values_period(target_conn): - # target_conn.read_tag_values(["sinusoid", "sinusoidu", "cdt158", "cdm158"], - # first_timestamp='2022/09/02 00:00:05', - # last_timestamp='2022/09/10 00:00:10', - # ) - - # df = target_conn.read_tag_values_period(["sinusoid", "sinusoidu"], - # first_timestamp='*-100h', - # last_timestamp='*', - # ) - # assert 72 > len(df) > 10 - # assert list(df.columns) == ['SINUSOID', 'SINUSOIDU'] - +def test_read_tag_values_period_recorded(target_conn): df = target_conn.read_tag_values_period( ["sinusoid", "sinusoidu"], # first_timestamp='*-100h', @@ -129,6 +128,24 @@ def test_read_tag_values_period(target_conn): ) assert list(df.columns) == ["SINUSOID", "SINUSOIDU"] + df = target_conn.read_tag_values_period( + ["sinusoid", "sinusoidu"], + first_timestamp="2020-04-15 12:00:00", + last_timestamp="2020-05-16 12:00:00", + max_results=10, + ) + assert list(df.columns) == ["SINUSOID", "SINUSOIDU"] + assert len(df) == 10 + + df = target_conn.read_tag_values_period( + ["sinusoid", "sinusoidu"], + first_timestamp="2020-04-15 12:00:00", + last_timestamp="2020-12-16 12:00:00", + max_results=2000, + ) + assert list(df.columns) == ["SINUSOID", "SINUSOIDU"] + assert len(df) == 2000 + def test_read_tag_attributes(target_conn): res = target_conn.read_tag_attributes(