Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New extract_cellset_axes_raw_async, extract_cellset_cell_async and extract_cellset_axes_cardinality #957

Merged
merged 1 commit into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions TM1py/Services/CellService.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import functools
import itertools
import json
import math
import uuid
import warnings
from collections import OrderedDict
Expand Down Expand Up @@ -2919,6 +2920,179 @@ def extract_cellset_cells_raw(
response = self._rest.GET(url=url, **kwargs)
return response.json()

def extract_cellset_axes_cardinality(self, cellset_id: str):
url = "/api/v1/Cellsets('{cellset_id}')?$expand=Axes($select=Cardinality)".format(cellset_id=cellset_id)
response = self._rest.GET(url=url)
return response.json()

def extract_cellset_axes_raw_async(
self,
cellset_id: str,
async_axis: int = 1,
max_workers: int = 8,
elem_properties: Iterable[str] = None,
member_properties: Iterable[str] = None,
skip_contexts: bool = False,
include_hierarchies: bool = False,
sandbox_name: str = None,
**kwargs):
""" Extract cellset axes asynchronously

:param cellset_id: String; ID of existing cellset
:param async_axis: determines which axis will be extracted asynchronously
:param max_workers: Max number of threads, e.g. 14
:param elem_properties: List of properties to be queried from elements. E.g. ['UniqueName','Attributes', ...]
:param member_properties: List properties to be queried from the member. E.g. ['Name', 'UniqueName']
:param skip_contexts: skip elements from titles / contexts in response
:param sandbox_name: str
:param include_hierarchies: retrieve Hierarchies property on Axes
:return: Raw format from TM1.
"""

axes_cardinality = self.extract_cellset_axes_cardinality(cellset_id=cellset_id)

if async_axis >= len(axes_cardinality['Axes']):
raise ValueError("Argument 'async_axis' must be less than axes cardinality")

# select Name property if member_properties is None or empty.
# Necessary, as tm1 default behaviour is to return all properties if no $select is specified in the request.
if member_properties is None or len(list(member_properties)) == 0:
member_properties = ["Name"]
select_member_properties = "$select={}".format(",".join(member_properties))

expand_elem_properties = ";$expand=Element($select={elem_properties})".format(
elem_properties=",".join(elem_properties)) \
if elem_properties is not None and len(list(elem_properties)) > 0 \
else ""

if include_hierarchies:
expand_hierarchies = "Hierarchies($select=Name;$expand=Dimension($select=Name)),"
else:
expand_hierarchies = ""

def _extract_cellset_axis_raw(axis: int = async_axis, partition: int = 0, partition_size: int = 0):
top = partition_size
skip = partition * partition_size
filter_axis = "$filter=Ordinal eq {axis};".format(axis=axis)
url = "/api/v1/Cellsets('{cellset_id}')?$expand=" \
"Axes({filter_axis}$expand={hierarchies}Tuples($expand=Members({select_member_properties}" \
"{expand_elem_properties}){partition}))" \
.format(cellset_id=cellset_id,
axis=axis,
partition=f";$top={top};$skip={skip}" if partition_size > 0 else "",
filter_axis=filter_axis,
hierarchies=expand_hierarchies,
select_member_properties=select_member_properties,
expand_elem_properties=expand_elem_properties)
url = add_url_parameters(url, **{"!sandbox": sandbox_name})
response = self._rest.GET(url=url, **kwargs)

return response.json()

async def _extract_cellset_axes_raw_async():
partition_size = math.ceil(axes_cardinality['Axes'][async_axis]['Cardinality'] / max_workers)
loop = asyncio.get_event_loop()
result_list = []
with ThreadPoolExecutor(max_workers) as executor:
futures = [
loop.run_in_executor(executor, _extract_cellset_axis_raw, async_axis, partition, partition_size)
for partition in range(max_workers)]

for future in futures:
result = await future
result_list = result_list + result['Axes'][0]['Tuples']
return result_list

# Extract non-asynchronous axis
axes = _extract_cellset_axis_raw(axis=1-async_axis)
# Extract tuples for asynchronous axis
async_axis_tuples = asyncio.run(_extract_cellset_axes_raw_async())
# Combine results
axes['Axes'].insert(async_axis,
{'Ordinal': async_axis, 'Cardinality': axes_cardinality['Axes'][async_axis]['Cardinality'],
'Tuples': async_axis_tuples})

if not skip_contexts:
context = _extract_cellset_axis_raw(axis=2)
if len(context['Axes']) > 0:
axes['Axes'].append(context['Axes'][0])

return axes

def extract_cellset_cells_raw_async(
self,
cellset_id: str,
max_workers: int = 8,
cell_properties: Iterable[str] = None,
skip_zeros: bool = False,
skip_consolidated_cells: bool = False,
skip_rule_derived_cells: bool = False,
sandbox_name: str = None,
**kwargs):

if not cell_properties:
cell_properties = ['Value']

if skip_rule_derived_cells:
cell_properties.append("RuleDerived")
# necessary due to bug in TM1 11.8: If only RuleDerived is retrieved it occasionally produces wrong results
cell_properties.append("Updateable")

if skip_consolidated_cells:
cell_properties.append("Consolidated")

if skip_zeros or skip_rule_derived_cells or skip_consolidated_cells:
if 'Ordinal' not in cell_properties:
cell_properties.append('Ordinal')

filter_cells = ""
if skip_zeros or skip_consolidated_cells or skip_rule_derived_cells:
filters = []
if skip_zeros:
filters.append("Value ne 0 and Value ne null and Value ne ''")
if skip_consolidated_cells:
filters.append("Consolidated eq false")
if skip_rule_derived_cells:
filters.append("RuleDerived eq false")

filter_cells = " and ".join(filters)

def _extract_cellset_cells_raw(partition: int = 0, partition_size: int = 0):
top = partition_size
skip = partition * partition_size

url = "/api/v1/Cellsets('{cellset_id}')?$expand=" \
"Cells($select={cell_properties}{top_cells}{skip_cells}{filter_cells})" \
.format(cellset_id=cellset_id,
cell_properties=",".join(cell_properties),
top_cells=f";$top={top}" if top else "",
skip_cells=f";$skip={skip}" if skip else "",
filter_cells=f";$filter={filter_cells}" if filter_cells else "")

url = add_url_parameters(url, **{"!sandbox": sandbox_name})
response = self._rest.GET(url=url, **kwargs)

return response.json()

async def _extract_cellset_cells_raw_async():
cellcount = self.extract_cellset_cellcount(cellset_id=cellset_id, sandbox_name=sandbox_name,
delete_cellset=False)
partition_size = math.ceil(cellcount / max_workers)
loop = asyncio.get_event_loop()
result_list = []
with ThreadPoolExecutor(max_workers) as executor:
futures = [loop.run_in_executor(executor, _extract_cellset_cells_raw, partition, partition_size)
for partition in range(max_workers)]
for future in futures:
result = await future
result_list = result_list + result['Cells']
cells = {'@odata.context': result['@odata.context'], 'ID': result['ID'], 'Cells': result_list}
return cells

cells = asyncio.run(_extract_cellset_cells_raw_async())

return cells

@tidy_cellset
@odata_compact_json(return_as_dict=False)
def extract_cellset_values(self, cellset_id: str, sandbox_name: str = None, use_compact_json: bool = False,
Expand Down
146 changes: 146 additions & 0 deletions Tests/CellService_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4262,6 +4262,152 @@ def test_check_feeders_dimensions_string_multi_hierarchy(self):

self.assertEqual(result['@odata.context'], '../$metadata#Collection(ibm.tm1.api.v1.FedCellDescriptor)')

def test_extract_cellset_axes_raw_async(self):
mdx = MdxBuilder.from_cube(self.cube_name) \
.rows_non_empty() \
.add_hierarchy_set_to_row_axis(
MdxHierarchySet.all_members(self.dimension_names[0], self.dimension_names[0])) \
.add_hierarchy_set_to_row_axis(
MdxHierarchySet.all_members(self.dimension_names[1], self.dimension_names[1])) \
.add_hierarchy_set_to_column_axis(
MdxHierarchySet.all_members(self.dimension_names[2], self.dimension_names[2])) \
.to_mdx()

cellset_id = self.tm1.cells.create_cellset(mdx=mdx)
data_async0 = self.tm1.cubes.cells.extract_cellset_axes_raw_async(cellset_id=cellset_id, async_axis=0)
data_async1 = self.tm1.cubes.cells.extract_cellset_axes_raw_async(cellset_id=cellset_id)
data = self.tm1.cubes.cells.extract_cellset_metadata_raw(cellset_id=cellset_id)
self.assertEqual(
data['Axes'], data_async0['Axes'])
self.assertEqual(
data['Axes'], data_async1['Axes'])

def test_extract_cellset_axes_raw_async_without_rows(self):
mdx = MdxBuilder.from_cube(self.cube_name) \
.columns_non_empty() \
.add_hierarchy_set_to_column_axis(
MdxHierarchySet.all_members(self.dimension_names[0], self.dimension_names[0])) \
.add_hierarchy_set_to_column_axis(
MdxHierarchySet.all_members(self.dimension_names[1], self.dimension_names[1])) \
.add_hierarchy_set_to_column_axis(
MdxHierarchySet.all_members(self.dimension_names[2], self.dimension_names[2])) \
.to_mdx()

cellset_id = self.tm1.cells.create_cellset(mdx=mdx)
data_async0 = self.tm1.cubes.cells.extract_cellset_axes_raw_async(cellset_id=cellset_id, async_axis=0)
data = self.tm1.cubes.cells.extract_cellset_metadata_raw(cellset_id=cellset_id)
self.assertEqual(
data['Axes'], data_async0['Axes'])
print('axes empty row', len(data['Axes']))
with self.assertRaises(ValueError) as _:
self.tm1.cubes.cells.extract_cellset_axes_raw_async(cellset_id=cellset_id)

def test_extract_cellset_axes_raw_async_with_empty_columns(self):
mdx = MdxBuilder.from_cube(self.cube_name) \
.rows_non_empty().add_hierarchy_set_to_row_axis(
MdxHierarchySet.all_members(self.dimension_names[0], self.dimension_names[0])) \
.add_hierarchy_set_to_row_axis(
MdxHierarchySet.all_members(self.dimension_names[1], self.dimension_names[1])) \
.add_hierarchy_set_to_row_axis(
MdxHierarchySet.all_members(self.dimension_names[2], self.dimension_names[2])) \
.columns_non_empty().add_hierarchy_set_to_column_axis(MdxHierarchySet.from_str("", "", "{}")) \
.to_mdx()

cellset_id = self.tm1.cells.create_cellset(mdx=mdx)
data_async0 = self.tm1.cubes.cells.extract_cellset_axes_raw_async(cellset_id=cellset_id, async_axis=0)
data_async1 = self.tm1.cubes.cells.extract_cellset_axes_raw_async(cellset_id=cellset_id)
data = self.tm1.cubes.cells.extract_cellset_metadata_raw(cellset_id=cellset_id)
self.assertEqual(
data['Axes'], data_async0['Axes'])
self.assertEqual(
data['Axes'], data_async1['Axes'])

def test_extract_cellset_axes_raw_async_with_member_properties_with_elem_properties(self):
mdx = MdxBuilder.from_cube(self.cube_name) \
.rows_non_empty() \
.add_hierarchy_set_to_row_axis(
MdxHierarchySet.all_members(self.dimension_names[0], self.dimension_names[0])) \
.add_hierarchy_set_to_row_axis(
MdxHierarchySet.all_members(self.dimension_names[1], self.dimension_names[1])) \
.add_hierarchy_set_to_column_axis(
MdxHierarchySet.all_members(self.dimension_names[2], self.dimension_names[2])) \
.to_mdx()

cellset_id = self.tm1.cells.create_cellset(mdx=mdx)
elem_properties = ["Name", "UniqueName", "Attributes/Attr1", "Attributes/Attr2"]
member_properties = ["Name", "Ordinal", "Weight"]
data_async0 = self.tm1.cubes.cells.extract_cellset_axes_raw_async(cellset_id=cellset_id, async_axis=0,
elem_properties=elem_properties,
member_properties=member_properties)
data_async1 = self.tm1.cubes.cells.extract_cellset_axes_raw_async(cellset_id=cellset_id,
elem_properties=elem_properties,
member_properties=member_properties)
data = self.tm1.cubes.cells.extract_cellset_metadata_raw(cellset_id=cellset_id,
elem_properties=elem_properties,
member_properties=member_properties)
self.assertEqual(
data['Axes'], data_async0['Axes'])
self.assertEqual(
data['Axes'], data_async1['Axes'])

def test_extract_cellset_cells_raw_async(self):
mdx = MdxBuilder.from_cube(self.cube_name) \
.rows_non_empty() \
.add_hierarchy_set_to_row_axis(
MdxHierarchySet.all_members(self.dimension_names[0], self.dimension_names[0])) \
.add_hierarchy_set_to_row_axis(
MdxHierarchySet.all_members(self.dimension_names[1], self.dimension_names[1])) \
.add_hierarchy_set_to_column_axis(
MdxHierarchySet.all_members(self.dimension_names[2], self.dimension_names[2])) \
.to_mdx()

cellset_id = self.tm1.cells.create_cellset(mdx=mdx)
data_async = self.tm1.cubes.cells.extract_cellset_cells_raw_async(cellset_id=cellset_id)
data = self.tm1.cubes.cells.extract_cellset_cells_raw(cellset_id=cellset_id)
self.assertEqual(
data['Cells'], data_async['Cells'])

def test_extract_cellset_cells_raw_async_with_cell_properties(self):
mdx = MdxBuilder.from_cube(self.cube_name) \
.rows_non_empty() \
.add_hierarchy_set_to_row_axis(
MdxHierarchySet.all_members(self.dimension_names[0], self.dimension_names[0])) \
.add_hierarchy_set_to_row_axis(
MdxHierarchySet.all_members(self.dimension_names[1], self.dimension_names[1])) \
.add_hierarchy_set_to_column_axis(
MdxHierarchySet.all_members(self.dimension_names[2], self.dimension_names[2])) \
.to_mdx()

cellset_id = self.tm1.cells.create_cellset(mdx=mdx)
cell_properties = ['Value', 'Updateable', 'Consolidated', 'RuleDerived']
data_async = self.tm1.cubes.cells.extract_cellset_cells_raw_async(cellset_id=cellset_id,
cell_properties=cell_properties)
data = self.tm1.cubes.cells.extract_cellset_cells_raw(cellset_id=cellset_id,
cell_properties=cell_properties)
self.assertEqual(
data['Cells'], data_async['Cells'])

def test_extract_cellset_cells_raw_async_skip_consolidated(self):
self.tm1.cells.write_values(self.cube_with_consolidations_name, self.cellset)
mdx = MdxBuilder.from_cube(self.cube_with_consolidations_name) \
.rows_non_empty() \
.add_hierarchy_set_to_row_axis(
MdxHierarchySet.all_members(self.dimensions_with_consolidations_names[0],
self.dimensions_with_consolidations_names[0])) \
.add_hierarchy_set_to_row_axis(
MdxHierarchySet.all_members(self.dimensions_with_consolidations_names[1],
self.dimensions_with_consolidations_names[1])) \
.add_hierarchy_set_to_column_axis(
MdxHierarchySet.all_members(self.dimensions_with_consolidations_names[2],
self.dimensions_with_consolidations_names[2])) \
.to_mdx()

cellset_id = self.tm1.cells.create_cellset(mdx=mdx)
data_async = self.tm1.cubes.cells.extract_cellset_cells_raw_async(cellset_id=cellset_id)
data = self.tm1.cubes.cells.extract_cellset_cells_raw(cellset_id=cellset_id)
self.assertEqual(
data['Cells'], data_async['Cells'])

# Delete Cube and Dimensions
@classmethod
def tearDownClass(cls):
Expand Down