From 27447aaf8885a57e6e0cfe86ad8bba993bc712a2 Mon Sep 17 00:00:00 2001 From: hkoertge Date: Mon, 25 Nov 2024 11:42:38 +0100 Subject: [PATCH 1/5] updated for geojson support --- .../powerfactory/pp_import_functions.py | 126 +++++++++--------- 1 file changed, 66 insertions(+), 60 deletions(-) diff --git a/pandapower/converter/powerfactory/pp_import_functions.py b/pandapower/converter/powerfactory/pp_import_functions.py index 065cd3364..4dd9c7692 100644 --- a/pandapower/converter/powerfactory/pp_import_functions.py +++ b/pandapower/converter/powerfactory/pp_import_functions.py @@ -3,14 +3,17 @@ import numbers import re from itertools import combinations +from typing import Literal, Optional, Union +import geojson import networkx as nx - import numpy as np +from pandas import DataFrame + import pandapower as pp +from pandapower.results import reset_results from pandapower.auxiliary import ADict import pandapower.control as control -from pandas import DataFrame, Series try: import pandaplan.core.pplog as logging @@ -23,6 +26,12 @@ # make wrapper for GetAttribute def ga(element, attr): return element.GetAttribute(attr) +# Define global variables +line_dict = {} +trafo_dict = {} +switch_dict = {} +bus_dict = {} +grf_map = {} # import network to pandapower: @@ -30,10 +39,9 @@ def from_pf(dict_net, pv_as_slack=True, pf_variable_p_loads='plini', pf_variable flag_graphics='GPS', tap_opt="nntap", export_controller=True, handle_us="Deactivate", max_iter=None, is_unbalanced=False, create_sections=True): global line_dict + global line_dict, trafo_dict, switch_dict, bus_dict, grf_map line_dict = {} - global trafo_dict trafo_dict = {} - global switch_dict switch_dict = {} logger.debug("__name__: %s" % __name__) logger.debug('started from_pf') @@ -45,10 +53,8 @@ def from_pf(dict_net, pv_as_slack=True, pf_variable_p_loads='plini', pf_variable grid_name = dict_net['ElmNet'].loc_name base_sn_mva = dict_net['global_parameters']['base_sn_mva'] net = pp.create_empty_network(grid_name, sn_mva=base_sn_mva) - net['bus_geodata'] = DataFrame(columns=['x', 'y']) - net['line_geodata'] = DataFrame(columns=['coords']) - pp.results.reset_results(net, mode="pf_3ph") + reset_results(net, mode="pf_3ph") if max_iter is not None: pp.set_user_pf_options(net, max_iteration=max_iter) logger.info('creating grid %s' % grid_name) @@ -57,9 +63,7 @@ def from_pf(dict_net, pv_as_slack=True, pf_variable_p_loads='plini', pf_variable logger.debug('creating buses') # create buses: - global bus_dict bus_dict = {} - global grf_map grf_map = dict_net.get('graphics', {}) logger.debug('the graphic mapping is: %s' % grf_map) @@ -375,11 +379,12 @@ def create_bus(net, item, flag_graphics, is_unbalanced): else: x, y = 0, 0 - # only values > 0+-1e-3 are entered into the bus_geodata - if x > 1e-3 or y > 1e-3: - geodata = (x, y) - else: - geodata = None + # Commented out because geojson is set up to do the precision handling + # # only values > 0+-1e-3 are entered into the bus.geo + # if x > 1e-3 or y > 1e-3: + # geodata = (x, y) + # else: + # geodata = None usage = ["b", "m", "n"] params = { @@ -387,7 +392,7 @@ def create_bus(net, item, flag_graphics, is_unbalanced): 'vn_kv': item.uknom, 'in_service': not bool(item.outserv), 'type': usage[item.iUsage], - 'geodata': geodata + 'geodata': geojson.dumps(geojson.Point((x, y))), } system_type = {0: "ac", 1: "dc", 2: "ac/bi"}[item.systype] @@ -625,23 +630,17 @@ def create_connection_switches(net, item, number_switches, et, buses, elements): def get_coords_from_buses(net, from_bus, to_bus, **kwargs): - coords = [] - if from_bus in net.bus_geodata.index: - x1, y1 = net.bus_geodata.loc[from_bus, ['x', 'y']] - has_coords = True - else: - x1, y1 = np.nan, np.nan - has_coords = False + coords: list[tuple[float, float]] = [] + from_geo: Optional[str] = None + to_geo: Optional[str] = None + if from_bus in net.bus.index: + from_geo: str = net.bus.loc[from_bus, ['geo']] - if to_bus in net.bus_geodata.index: - x2, y2 = net.bus_geodata.loc[to_bus, ['x', 'y']] - has_coords = True - else: - x2, y2 = np.nan, np.nan - has_coords = False + if to_bus in net.bus.index: + to_geo: str = net.bus.loc[to_bus, ['geo']] - if has_coords: - coords = [[x1, y1], [x2, y2]] + if from_geo and to_geo: + coords = [geojson.utils.coords(geojson.loads(from_geo)), geojson.utils.coords(geojson.loads(to_geo))] logger.debug('got coords from buses: %s' % coords) else: logger.debug('no coords for line between buses %d and %d' % (from_bus, to_bus)) @@ -656,7 +655,7 @@ def get_coords_from_item(item): c = tuple((x, y) for [y, x] in coords) except ValueError: try: - c = tuple((x, y) for [y, x, z] in coords) + c = tuple((x, y, z) for [y, x, z] in coords) except ValueError: c = [] return c @@ -684,7 +683,6 @@ def get_coords_from_grf_object(item): if len(coords) == 0: coords = [[graphic_object.rCenterX, graphic_object.rCenterY]] * 2 logger.debug('extracted line coords from graphic object: %s' % coords) - # net.line_geodata.loc[lid, 'coords'] = coords else: coords = [] @@ -859,6 +857,11 @@ def get_section_coords(coords, sec_len, start_len, scale_factor): def segment_buses(net, bus1, bus2, num_sections, line_name): # , sec_len, start_len, coords): + """ + splits bus1, bus2 line so that it creates num_sections amount of lines. + Yields start, end for each line segment. + e.g. Yields bus1, a, a, bus2 for num_sections = 2. + """ yield bus1 m = 1 # if coords: @@ -877,8 +880,10 @@ def segment_buses(net, bus1, bus2, num_sections, line_name): # , sec_len, start bus_name = "%s (Muff %u)" % (line_name, m) vn_kv = net.bus.at[bus1, "vn_kv"] zone = net.bus.at[bus1, "zone"] - k = pp.create_bus(net, name=bus_name, type='ls', vn_kv=vn_kv, zone=zone) + bus = pp.create_bus(net, name=bus_name, type='ls', vn_kv=vn_kv, zone=zone) + # TODO: implement coords for segmentation buses. + # Handle coords if line has multiple coords. # if coords: # split_len += sec_len[m - 1] * scale_factor # @@ -888,9 +893,9 @@ def segment_buses(net, bus1, bus2, num_sections, line_name): # , sec_len, start # logger.warning('bus %d has 0 coords, bus1: %d, bus2: %d' % k, bus1, bus2) if "description" in net.bus: - net.bus.at[k, "description"] = u"" - yield k - yield k + net.bus.at[bus, "description"] = "" + yield bus + yield bus m += 1 else: yield bus2 @@ -928,10 +933,10 @@ def create_line_sections(net, item_list, line, bus1, bus2, coords, parallel, is_ scaling_factor = sum(sec_len) / calc_len_coords(coords) sec_coords = get_section_coords(coords, sec_len=item.dline, start_len=item.rellen, scale_factor=scaling_factor) - net.line_geodata.loc[sid, 'coords'] = sec_coords + net.line.loc[sid, 'geo'] = geojson.dumps(geojson.LineString(sec_coords)) # p1 = sec_coords[0] # p2 = sec_coords[-1] - net.bus_geodata.loc[bus2, ['x', 'y']] = sec_coords[-1] + net.bus.loc[bus2, ['geo']] = geojson.dumps(geojson.Point(sec_coords[-1])) except ZeroDivisionError: logger.warning("Could not generate geodata for line !!") @@ -1589,9 +1594,9 @@ def split_line_add_bus_old(net, item, parent): raise RuntimeError('incorrect length for section %s: %.3f' % (sec, sec_len_b)) # get coords - if sid in net.line_geodata.index.values: + if net.line.at[sid, 'geo'].notna(): logger.debug('line has coords') - coords = net.line_geodata.at[sid, 'coords'] + coords = geojson.utils.coords(geojson.loads(net.line.at[sid, 'geo'])) logger.debug('old geodata of line %d: %s' % (sid, coords)) # get coords for 2 split lines @@ -1619,12 +1624,17 @@ def split_line_add_bus_old(net, item, parent): logger.debug('created new bus in net: %s' % net.bus.loc[bus]) # create new line - lid = pp.create_line(net, from_bus=bus, to_bus=net.line.at[sid, 'to_bus'], - length_km=sec_len_b, - std_type=net.line.at[sid, 'std_type'], - name=net.line.at[sid, 'name'], df=net.line.at[sid, 'df']) + lid = pp.create_line( + net, + from_bus=bus, + to_bus=net.line.at[sid, 'to_bus'], + length_km=sec_len_b, + std_type=net.line.at[sid, 'std_type'], + name=net.line.at[sid, 'name'], + df=net.line.at[sid, 'df'], + geodata=coords_b + ) net.line.at[lid, 'section'] = net.line.at[sid, 'section'] - net.line_geodata.loc[lid, 'coords'] = coords_b if not net.line.loc[sid, 'section_idx']: net.line.loc[sid, 'section_idx'] = 0 @@ -1635,7 +1645,7 @@ def split_line_add_bus_old(net, item, parent): net.line.at[sid, 'to_bus'] = bus net.line.at[sid, 'length_km'] = sec_len_a - net.line_geodata.loc[sid, 'coords'] = coords_a + net.line.at[sid, 'geo'] = geojson.dumps(geojson.LineString(coords_a)) logger.debug('changed: %s' % net.line.loc[sid]) else: # no new bus/line are created: take the to_bus @@ -3324,20 +3334,17 @@ def split_line_at_length(net, line, length_pos): if 'max_loading_percent' in net.line.columns: net.line.loc[new_line, 'max_loading_percent'] = net.line.at[line, 'max_loading_percent'] - if 'line_geodata' in net.keys() and line in net.line_geodata.index.values: - coords = net.line_geodata.at[line, 'coords'] + if net.line.loc[line, 'geo'].notna(): + coords = geojson.utils.coords(geojson.loads(net.line.loc[line, 'geo'])) scaling_factor = old_length / calc_len_coords(coords) sec_coords_a = get_section_coords(coords, sec_len=length_pos, start_len=0., scale_factor=scaling_factor) - sec_coords_b = get_section_coords(coords, sec_len=new_length, start_len=length_pos, - scale_factor=scaling_factor) - - net.line_geodata.loc[line, 'coords'] = sec_coords_a - net.line_geodata.loc[new_line, 'coords'] = sec_coords_b + net.line.loc[line, 'geo'] = geojson.dumps(geojson.LineString(sec_coords_a)) + net.line.loc[new_line, 'geo'] = geojson.dumps(geojson.LineString(sec_coords_b)) - net.bus_geodata.loc[bus, ['x', 'y']] = sec_coords_b[0] + net.bus.loc[bus, ['geo']] = geojson.Point(sec_coords_b[0]) return bus @@ -3438,7 +3445,7 @@ def split_line(net, line_idx, pos_at_line, line_item): net.line.at[new_line, 'order'] = net.line.at[line_idx, 'order'] + 1 net.res_line.at[new_line, 'pf_loading'] = net.res_line.at[line_idx, 'pf_loading'] - if line_idx in net.line_geodata.index.values: + if line_idx in net.line.index: logger.debug('setting new coords') set_new_coords(net, new_bus, line_idx, new_line, line_length, pos_at_line) @@ -3503,7 +3510,7 @@ def break_coords_sections(coords, section_length, scale_factor_length): # set up new coordinates for line sections that are split by the new bus of the ElmLodlvp def set_new_coords(net, bus_id, line_idx, new_line_idx, line_length, pos_at_line): - line_coords = net.line_geodata.at[line_idx, 'coords'] + line_coords = net.line.at[line_idx, 'geo'] logger.debug('got coords for line %s' % line_idx) scale_factor_length = get_scale_factor(line_length, line_coords) @@ -3512,11 +3519,10 @@ def set_new_coords(net, bus_id, line_idx, new_line_idx, line_length, pos_at_line logger.debug('calculated new coords: %s, %s ' % (section_coords, new_coords)) - net.line_geodata.at[line_idx, 'coords'] = section_coords - net.line_geodata.at[new_line_idx, 'coords'] = new_coords + net.line.at[line_idx, 'geo'] = geojson.dumps(geojson.LineString(section_coords)) + net.line.at[new_line_idx, 'geo'] = geojson.dumps(geojson.LineString(new_coords)) - net.bus_geodata.at[bus_id, 'x'] = new_coords[0][0] - net.bus_geodata.at[bus_id, 'y'] = new_coords[0][1] + net.bus.at[bus_id, 'geo'] = geojson.dumps(geojson.Point(new_coords[0])) # gather info about ElmLodlvp in a dict From d8486f6ad5381fcb469d06d3ff0e8a0599755068 Mon Sep 17 00:00:00 2001 From: hkoertge Date: Mon, 25 Nov 2024 11:47:52 +0100 Subject: [PATCH 2/5] removed ga wrapper function for better code quality --- .../powerfactory/pp_import_functions.py | 202 +++++++++--------- 1 file changed, 99 insertions(+), 103 deletions(-) diff --git a/pandapower/converter/powerfactory/pp_import_functions.py b/pandapower/converter/powerfactory/pp_import_functions.py index 4dd9c7692..472f6445a 100644 --- a/pandapower/converter/powerfactory/pp_import_functions.py +++ b/pandapower/converter/powerfactory/pp_import_functions.py @@ -22,10 +22,6 @@ logger = logging.getLogger(__name__) - -# make wrapper for GetAttribute -def ga(element, attr): - return element.GetAttribute(attr) # Define global variables line_dict = {} trafo_dict = {} @@ -346,12 +342,12 @@ def add_additional_attributes(item, net, element, element_id, attr_list=None, at obj = item for a in attr.split('.'): if hasattr(obj, 'HasAttribute') and obj.HasAttribute(a): - obj = ga(obj, a) + obj = obj.GetAttributes(a) if obj is not None and isinstance(obj, str): net[element].loc[element_id, attr_dict[attr]] = obj elif item.HasAttribute(attr): - chr_name = ga(item, attr) + chr_name = item.GetAttributes(attr) if chr_name is not None: if isinstance(chr_name, (str, numbers.Number)): net[element].loc[element_id, attr_dict[attr]] = chr_name @@ -366,13 +362,13 @@ def add_additional_attributes(item, net, element, element_id, attr_list=None, at def create_bus(net, item, flag_graphics, is_unbalanced): # add geo data if flag_graphics == 'GPS': - x = ga(item, 'e:GPSlon') - y = ga(item, 'e:GPSlat') + x = item.GetAttributes('e:GPSlon') + y = item.GetAttributes('e:GPSlat') elif flag_graphics == 'graphic objects': graphic_object = get_graphic_object(item) if graphic_object: - x = ga(graphic_object, 'rCenterX') - y = ga(graphic_object, 'rCenterY') + x = graphic_object.GetAttributes('rCenterX') + y = graphic_object.GetAttributes('rCenterY') # add gr coord data else: x, y = 0, 0 @@ -472,7 +468,7 @@ def get_pf_bus_results(net, item, bid, is_unbalanced, system_type): for res_var_pp, res_var_pf in result_variables.items(): res = np.nan if item.HasResults(0): - res = ga(item, res_var_pf) + res = item.GetAttributes(res_var_pf) # dc bus voltage can be negative: net[bus_type].at[bid, res_var_pp] = np.abs(res) if "vm_pu" in res_var_pp else res @@ -480,7 +476,7 @@ def get_pf_bus_results(net, item, bid, is_unbalanced, system_type): # # This one deletes all the results :( # # Don't use it # def find_bus_index_in_net(item, net=None): -# foreign_key = int(ga(item, 'for_name')) +# foreign_key = int(item.GetAttributes('for_name')) # return foreign_key @@ -489,32 +485,32 @@ def get_pf_bus_results(net, item, bid, is_unbalanced, system_type): # def find_bus_index_in_net(item, net): # usage = ["b", "m", "n"] # # to be sure that the bus is the correct one -# name = ga(item, 'loc_name') -# bus_type = usage[ga(item, 'iUsage')] +# name = item.GetAttributes('loc_name') +# bus_type = usage[item.GetAttributes('iUsage')] # logger.debug('looking for bus <%s> in net' % name) # # if item.HasAttribute('cpSubstat'): -# substat = ga(item, 'cpSubstat') +# substat = item.GetAttributes('cpSubstat') # if substat is not None: -# descr = ga(substat, 'loc_name') +# descr = substat.GetAttributes('loc_name') # logger.debug('bus <%s> has substat, descr is <%s>' % (name, descr)) # else: # # omg so ugly :( -# descr = ga(item, 'desc') +# descr = item.GetAttributes('desc') # descr = descr[0] if len(descr) > 0 else "" # logger.debug('substat is none, descr of bus <%s> is <%s>' % (name, descr)) # else: -# descr = ga(item, 'desc') +# descr = item.GetAttributes('desc') # descr = descr[0] if len(descr) > 0 else "" # logger.debug('no attribute "substat", descr of bus <%s> is <%s>' % (name, descr)) # # try: -# zone = ga(item, 'Grid') -# zone_name = ga(zone, 'loc_name').split('.ElmNet')[0] +# zone = item.GetAttributes('Grid') +# zone_name = zone.GetAttributes('loc_name').split('.ElmNet')[0] # logger.debug('zone "Grid" found: <%s>' % zone_name) # except: -# zone = ga(item, 'cpGrid') -# zone_name = ga(zone, 'loc_name').split('.ElmNet')[0] +# zone = item.GetAttributes('cpGrid') +# zone_name = zone.GetAttributes('loc_name').split('.ElmNet')[0] # logger.debug('zone "cpGrid" found: <%s>' % zone_name) # # temp_df_a = net.bus[net.bus.zone == zone_name] @@ -568,12 +564,12 @@ def get_connection_nodes(net, item, num_nodes): item, pf_class)) if pf_class == "ElmTr2": - v.append(ga(item, 't:utrn_h')) - v.append(ga(item, 't:utrn_l')) + v.append(item.GetAttributes('t:utrn_h')) + v.append(item.GetAttributes('t:utrn_l')) elif pf_class == "ElmTr3": - v.append(ga(item, 't:utrn3_h')) - v.append(ga(item, 't:utrn3_m')) - v.append(ga(item, 't:utrn3_l')) + v.append(item.GetAttributes('t:utrn3_h')) + v.append(item.GetAttributes('t:utrn3_m')) + v.append(item.GetAttributes('t:utrn3_l')) else: v = [net[table].vn_kv.at[existing_bus] for _ in buses] @@ -907,7 +903,7 @@ def create_line_sections(net, item_list, line, bus1, bus2, coords, parallel, is_ item_list.sort(key=lambda x: x.index) # to ensure they are in correct order if line.HasResults(-1): # -1 for 'c' results (whatever that is...) - line_loading = ga(line, 'c:loading') + line_loading = line.GetAttributes('c:loading') else: line_loading = np.nan @@ -1109,7 +1105,7 @@ def get_pf_line_results(net, item, lid, is_unbalanced, ac): for res_var_pp, res_var_pf in result_variables.items(): res = np.nan if item.HasResults(-1): # -1 for 'c' results (whatever that is...) - res = ga(item, res_var_pf) + res = item.GetAttributes(res_var_pf) net[line_type].at[lid, res_var_pp] = res @@ -1241,8 +1237,8 @@ def create_ext_net(net, item, pv_as_slack, is_unbalanced): # if item.HasResults(0): # 'm' results... # # sm:r, sm:i don't work... # logger.debug('<%s> has results' % name) - # net['res_' + elm].at[xid, "pf_p"] = ga(item, 'm:P:bus1') - # net['res_' + elm].at[xid, "pf_q"] = ga(item, 'm:Q:bus1') + # net['res_' + elm].at[xid, "pf_p"] = item.GetAttributes('m:P:bus1') + # net['res_' + elm].at[xid, "pf_q"] = item.GetAttributes('m:Q:bus1') # else: # net['res_' + elm].at[xid, "pf_p"] = np.nan # net['res_' + elm].at[xid, "pf_q"] = np.nan @@ -1279,7 +1275,7 @@ def get_pf_ext_grid_results(net, item, xid, is_unbalanced): for res_var_pp, res_var_pf in result_variables.items(): res = np.nan if item.HasResults(0): - res = ga(item, res_var_pf) + res = item.GetAttributes(res_var_pf) net[ext_grid_type].at[xid, res_var_pp] = res @@ -1369,11 +1365,11 @@ def ask_load_params(item, pf_variable_p_loads, dict_net, variables): if pf_variable_p_loads == 'm:P:bus1' and not item.HasResults(0): raise RuntimeError('load %s does not have results and is ignored' % item.loc_name) if 'p_mw' in variables: - params.p_mw = ga(item, pf_variable_p_loads) * multiplier + params.p_mw = item.GetAttributes(pf_variable_p_loads) * multiplier if 'q_mvar' in variables: - params.q_mvar = ga(item, map_power_var(pf_variable_p_loads, 'q')) * multiplier + params.q_mvar = item.GetAttributes(map_power_var(pf_variable_p_loads, 'q')) * multiplier if 'sn_mva' in variables: - params.sn_mva = ga(item, map_power_var(pf_variable_p_loads, 's')) * multiplier + params.sn_mva = item.GetAttributes(map_power_var(pf_variable_p_loads, 's')) * multiplier kap = -1 if item.pf_recap == 1 else 1 try: @@ -1401,17 +1397,17 @@ def ask_unbalanced_load_params(item, pf_variable_p_loads, dict_net, variables): if pf_variable_p_loads == 'm:P:bus1' and not item.HasResults(0): raise RuntimeError('load %s does not have results and is ignored' % item.loc_name) if 'p_mw' in variables: - params.p_a_mw = ga(item, pf_variable_p_loads + "r") - params.p_b_mw = ga(item, pf_variable_p_loads + "s") - params.p_c_mw = ga(item, pf_variable_p_loads + "t") + params.p_a_mw = item.GetAttributes(pf_variable_p_loads + "r") + params.p_b_mw = item.GetAttributes(pf_variable_p_loads + "s") + params.p_c_mw = item.GetAttributes(pf_variable_p_loads + "t") if 'q_mvar' in variables: - params.q_a_mvar = ga(item, map_power_var(pf_variable_p_loads, 'q') + "r") - params.q_b_mvar = ga(item, map_power_var(pf_variable_p_loads, 'q') + "s") - params.q_c_mvar = ga(item, map_power_var(pf_variable_p_loads, 'q') + "t") + params.q_a_mvar = item.GetAttributes(map_power_var(pf_variable_p_loads, 'q') + "r") + params.q_b_mvar = item.GetAttributes(map_power_var(pf_variable_p_loads, 'q') + "s") + params.q_c_mvar = item.GetAttributes(map_power_var(pf_variable_p_loads, 'q') + "t") if 'sn_mva' in variables: - params.sn_a_mva = ga(item, map_power_var(pf_variable_p_loads, 's') + "r") - params.sn_b_mva = ga(item, map_power_var(pf_variable_p_loads, 's') + "s") - params.sn_c_mva = ga(item, map_power_var(pf_variable_p_loads, 's') + "t") + params.sn_a_mva = item.GetAttributes(map_power_var(pf_variable_p_loads, 's') + "r") + params.sn_b_mva = item.GetAttributes(map_power_var(pf_variable_p_loads, 's') + "s") + params.sn_c_mva = item.GetAttributes(map_power_var(pf_variable_p_loads, 's') + "t") kap = -1 if item.pf_recap == 1 else 1 try: @@ -1666,7 +1662,7 @@ def create_load(net, item, pf_variable_p_loads, dict_net, is_unbalanced): ask = ask_unbalanced_load_params if is_unbalanced else ask_load_params if load_class == 'ElmLodlv': - # if bool(ga(item, 'e:cHasPartLod')): + # if bool(item.GetAttributes('e:cHasPartLod')): # logger.info('ElmLodlv %s has partial loads - skip' % item.loc_name) # part_lods = item.GetContents('*.ElmLodlvp') # logger.debug('%s' % part_lods) @@ -1701,8 +1697,8 @@ def create_load(net, item, pf_variable_p_loads, dict_net, is_unbalanced): i = 0 z = 0 for cc, ee in zip(("aP", "bP", "cP"), ("kpu0", "kpu1", "kpu")): - c = ga(load_type, cc) - e = ga(load_type, ee) + c = load_type.GetAttributes(cc) + e = load_type.GetAttributes(ee) if e == 1: i += 100 * c elif e == 2: @@ -1788,8 +1784,8 @@ def create_load(net, item, pf_variable_p_loads, dict_net, is_unbalanced): # if not is_unbalanced: # if item.HasResults(0): # 'm' results... # logger.debug('<%s> has results' % params.name) - # net["res_load"].at[ld, "pf_p"] = ga(item, 'm:P:bus1') - # net["res_load"].at[ld, "pf_q"] = ga(item, 'm:Q:bus1') + # net["res_load"].at[ld, "pf_p"] = item.GetAttributes('m:P:bus1') + # net["res_load"].at[ld, "pf_q"] = item.GetAttributes('m:Q:bus1') # else: # net["res_load"].at[ld, "pf_p"] = np.nan # net["res_load"].at[ld, "pf_q"] = np.nan @@ -1820,7 +1816,7 @@ def get_pf_load_results(net, item, ld, is_unbalanced): for res_var_pp, res_var_pf in result_variables.items(): res = np.nan if item.HasResults(0): - res = ga(item, res_var_pf) * get_power_multiplier(item, res_var_pf) + res = item.GetAttributes(res_var_pf) * get_power_multiplier(item, res_var_pf) net[load_type].at[ld, res_var_pp] = res @@ -1830,11 +1826,11 @@ def ask_gen_params(item, pf_variable_p_gen, *vars): if pf_variable_p_gen == 'm:P:bus1' and not item.HasResults(0): raise RuntimeError('generator %s does not have results and is ignored' % item.loc_name) if 'p_mw' in vars: - params.p_mw = ga(item, pf_variable_p_gen) * multiplier + params.p_mw = item.GetAttributes(pf_variable_p_gen) * multiplier if 'q_mvar' in vars: - params.q_mvar = ga(item, map_power_var(pf_variable_p_gen, 'q')) * multiplier + params.q_mvar = item.GetAttributes(map_power_var(pf_variable_p_gen, 'q')) * multiplier if 'sn_mva' in vars: - params.sn_mva = ga(item, map_power_var(pf_variable_p_gen, 'sn')) * multiplier + params.sn_mva = item.GetAttributes(map_power_var(pf_variable_p_gen, 'sn')) * multiplier params.scaling = item.scale0 if pf_variable_p_gen == 'pgini' else 1 # p_mw = p_mw, q_mvar = q_mvar, scaling = scaling @@ -1850,25 +1846,25 @@ def ask_unbalanced_sgen_params(item, pf_variable_p_sgen, *vars): technology = item.phtech if technology in [0, 1]: # (0-1: 3PH) if 'p_mw' in vars: - params.p_a_mw = ga(item, pf_variable_p_sgen) / 3 - params.p_b_mw = ga(item, pf_variable_p_sgen) / 3 - params.p_c_mw = ga(item, pf_variable_p_sgen) / 3 + params.p_a_mw = item.GetAttributes(pf_variable_p_sgen) / 3 + params.p_b_mw = item.GetAttributes(pf_variable_p_sgen) / 3 + params.p_c_mw = item.GetAttributes(pf_variable_p_sgen) / 3 if 'q_mvar' in vars: - params.q_a_mvar = ga(item, map_power_var(pf_variable_p_sgen, 'q')) / 3 - params.q_b_mvar = ga(item, map_power_var(pf_variable_p_sgen, 'q')) / 3 - params.q_c_mvar = ga(item, map_power_var(pf_variable_p_sgen, 'q')) / 3 + params.q_a_mvar = item.GetAttributes(map_power_var(pf_variable_p_sgen, 'q')) / 3 + params.q_b_mvar = item.GetAttributes(map_power_var(pf_variable_p_sgen, 'q')) / 3 + params.q_c_mvar = item.GetAttributes(map_power_var(pf_variable_p_sgen, 'q')) / 3 elif technology in [2, 3, 4]: # (2-4: 1PH) if 'p_mw' in vars: - params.p_a_mw = ga(item, pf_variable_p_sgen) + params.p_a_mw = item.GetAttributes(pf_variable_p_sgen) params.p_b_mw = 0 params.p_c_mw = 0 if 'q_mvar' in vars: - params.q_a_mvar = ga(item, map_power_var(pf_variable_p_sgen, 'q')) + params.q_a_mvar = item.GetAttributes(map_power_var(pf_variable_p_sgen, 'q')) params.q_b_mvar = 0 params.q_c_mvar = 0 if 'sn_mva' in vars: - params.sn_mva = ga(item, map_power_var(pf_variable_p_sgen, 's')) + params.sn_mva = item.GetAttributes(map_power_var(pf_variable_p_sgen, 's')) params.scaling = item.scale0 if pf_variable_p_sgen == 'pgini' else 1 return params @@ -2046,7 +2042,7 @@ def get_pf_sgen_results(net, item, sg, is_unbalanced, element='sgen'): res = np.nan if item.HasResults(0): if res_var_pf is not None: - res = ga(item, res_var_pf) * get_power_multiplier(item, res_var_pf) + res = item.GetAttributes(res_var_pf) * get_power_multiplier(item, res_var_pf) else: res = np.nan net[sgen_type].at[sg, res_var_pp] = res @@ -2084,8 +2080,8 @@ def create_sgen_neg_load(net, item, pf_variable_p_loads, dict_net): if item.HasResults(0): # 'm' results... logger.debug('<%s> has results' % params.name) - net.res_sgen.at[sg, "pf_p"] = -ga(item, 'm:P:bus1') - net.res_sgen.at[sg, "pf_q"] = -ga(item, 'm:Q:bus1') + net.res_sgen.at[sg, "pf_p"] = -item.GetAttributes('m:P:bus1') + net.res_sgen.at[sg, "pf_q"] = -item.GetAttributes('m:Q:bus1') else: net.res_sgen.at[sg, "pf_p"] = np.nan net.res_sgen.at[sg, "pf_q"] = np.nan @@ -2184,8 +2180,8 @@ def create_sgen_sym(net, item, pv_as_slack, pf_variable_p_gen, dict_net, export_ if item.HasResults(0): # 'm' results... logger.debug('<%s> has results' % name) - net['res_' + element].at[sid, "pf_p"] = ga(item, 'm:P:bus1') * multiplier - net['res_' + element].at[sid, "pf_q"] = ga(item, 'm:Q:bus1') * multiplier + net['res_' + element].at[sid, "pf_p"] = item.GetAttributes('m:P:bus1') * multiplier + net['res_' + element].at[sid, "pf_q"] = item.GetAttributes('m:Q:bus1') * multiplier else: net['res_' + element].at[sid, "pf_p"] = np.nan net['res_' + element].at[sid, "pf_q"] = np.nan @@ -2199,10 +2195,10 @@ def create_sgen_asm(net, item, pf_variable_p_gen, dict_net): dict_net['global_parameters']['global_generation_scaling'] multiplier = get_power_multiplier(item, pf_variable_p_gen) - p_res = ga(item, 'pgini') * multiplier - q_res = ga(item, 'qgini') * multiplier + p_res = item.GetAttributes('pgini') * multiplier + q_res = item.GetAttributes('qgini') * multiplier if item.HasResults(0): - q_res = ga(item, 'm:Q:bus1') / global_scaling * multiplier + q_res = item.GetAttributes('m:Q:bus1') / global_scaling * multiplier else: logger.warning('reactive power for asynchronous generator is not exported properly ' '(advanced modelling of asynchronous generators not implemented)') @@ -2237,8 +2233,8 @@ def create_sgen_asm(net, item, pf_variable_p_gen, dict_net): attr_list=["sernum", "chr_name", "cpSite.loc_name"]) if item.HasResults(0): - net.res_sgen.at[sid, 'pf_p'] = ga(item, 'm:P:bus1') * multiplier - net.res_sgen.at[sid, 'pf_q'] = ga(item, 'm:Q:bus1') * multiplier + net.res_sgen.at[sid, 'pf_p'] = item.GetAttributes('m:P:bus1') * multiplier + net.res_sgen.at[sid, 'pf_q'] = item.GetAttributes('m:Q:bus1') * multiplier else: net.res_sgen.at[sid, 'pf_p'] = np.nan net.res_sgen.at[sid, 'pf_q'] = np.nan @@ -2349,11 +2345,11 @@ def create_trafo(net, item, export_controller=True, tap_opt="nntap", is_unbalanc tap_pos = np.nan if pf_type.itapch: if tap_opt == "nntap": - tap_pos = ga(item, "nntap") + tap_pos = item.GetAttributes("nntap") logger.debug("got tap %f from nntap" % tap_pos) elif tap_opt == "c:nntap": - tap_pos = ga(item, "c:nntap") + tap_pos = item.GetAttributes("c:nntap") logger.debug("got tap %f from c:nntap" % tap_pos) else: raise ValueError('could not read current tap position: tap_opt = %s' % tap_opt) @@ -2362,9 +2358,9 @@ def create_trafo(net, item, export_controller=True, tap_opt="nntap", is_unbalanc # In PowerFactory, if the first tap changer is absent, the second is also, even if the check was there if pf_type.itapch and pf_type.itapch2: if tap_opt == "nntap": - tap_pos2 = ga(item, "nntap2") + tap_pos2 = item.GetAttributes("nntap2") elif tap_opt == "c:nntap": - tap_pos2 = ga(item, "c:nntap2") + tap_pos2 = item.GetAttributes("c:nntap2") if std_type is not None: tid = pp.create_transformer(net, hv_bus=bus1, lv_bus=bus2, name=name, @@ -2475,7 +2471,7 @@ def get_pf_trafo_results(net, item, tid, is_unbalanced): for res_var_pp, res_var_pf in result_variables.items(): res = np.nan if item.HasResults(-1): # -1 for 'c' results (whatever that is...) - res = ga(item, res_var_pf) + res = item.GetAttributes(res_var_pf) net[trafo_type].at[tid, res_var_pp] = res @@ -2544,21 +2540,21 @@ def create_trafo3w(net, item, tap_opt='nntap'): ts = ["h", "m", "l"][side[0]] # figure out current tap position if tap_opt == "nntap": - tap_pos = ga(item, 'n3tap_' + ts) + tap_pos = item.GetAttributes('n3tap_' + ts) logger.debug("got tap %f from n3tap" % tap_pos) elif tap_opt == "c:nntap": - tap_pos = ga(item, "c:n3tap_" + ts) + tap_pos = item.GetAttributes("c:n3tap_" + ts) logger.debug("got tap %f from c:n3tap" % tap_pos) else: raise ValueError('could not read current tap position: tap_opt = %s' % tap_opt) params.update({ 'tap_side': ts + 'v', # hv, mv, lv - 'tap_step_percent': ga(item, 't:du3tp_' + ts), - 'tap_step_degree': ga(item, 't:ph3tr_' + ts), - 'tap_min': ga(item, 't:n3tmn_' + ts), - 'tap_max': ga(item, 't:n3tmx_' + ts), - 'tap_neutral': ga(item, 't:n3tp0_' + ts), + 'tap_step_percent': item.GetAttributes('t:du3tp_' + ts), + 'tap_step_degree': item.GetAttributes('t:ph3tr_' + ts), + 'tap_min': item.GetAttributes('t:n3tmn_' + ts), + 'tap_max': item.GetAttributes('t:n3tmx_' + ts), + 'tap_neutral': item.GetAttributes('t:n3tp0_' + ts), 'tap_pos': tap_pos }) @@ -2572,7 +2568,7 @@ def create_trafo3w(net, item, tap_opt='nntap'): logger.debug('successfully created trafo3w from parameters: %d' % tid) # testen - # net.trafo3w.loc[tid, 'tap_step_degree'] = ga(item, 't:ph3tr_h') + # net.trafo3w.loc[tid, 'tap_step_degree'] = item.GetAttributes('t:ph3tr_h') # adding switches # False if open, True if closed, None if no switch @@ -2587,7 +2583,7 @@ def create_trafo3w(net, item, tap_opt='nntap'): # assign loading from power factory results if item.HasResults(-1): # -1 for 'c' results (whatever that is...) logger.debug('trafo3w <%s> has results' % item.loc_name) - loading = ga(item, 'c:loading') + loading = item.GetAttributes('c:loading') net.res_trafo3w.at[tid, "pf_loading"] = loading else: net.res_trafo3w.at[tid, "pf_loading"] = np.nan @@ -2597,12 +2593,12 @@ def create_trafo3w(net, item, tap_opt='nntap'): if pf_type.itapzdep: x_points = (net.trafo3w.at[tid, "tap_min"], net.trafo3w.at[tid, "tap_neutral"], net.trafo3w.at[tid, "tap_max"]) for side in ("hv", "mv", "lv"): - vk_min = ga(pf_type, f"uktr3mn_{side[0]}") + vk_min = pf_type.GetAttributes(f"uktr3mn_{side[0]}") vk_neutral = net.trafo3w.at[tid, f"vk_{side}_percent"] - vk_max = ga(pf_type, f"uktr3mx_{side[0]}") - vkr_min = ga(pf_type, f"uktrr3mn_{side[0]}") + vk_max = pf_type.GetAttributes(f"uktr3mx_{side[0]}") + vkr_min = pf_type.GetAttributes(f"uktrr3mn_{side[0]}") vkr_neutral = net.trafo3w.at[tid, f"vkr_{side}_percent"] - vkr_max = ga(pf_type, f"uktrr3mx_{side[0]}") + vkr_max = pf_type.GetAttributes(f"uktrr3mx_{side[0]}") # todo zero-sequence parameters (must be implemented in build_branch first) pp.control.create_trafo_characteristics(net, trafotable="trafo3w", trafo_index=tid, variable=f"vk_{side}_percent", x_points=x_points, @@ -2658,7 +2654,7 @@ def create_coup(net, item, is_fuse=False): # # false approach, completely irrelevant # def create_switch(net, item): # switch_types = {"cbk": "CB", "sdc": "LBS", "swt": "LS", "dct": "DS"} -# name = ga(item, 'loc_name') +# name = item.GetAttributes('loc_name') # logger.debug('>> creating switch <%s>' % name) # # pf_bus1 = item.GetNode(0) @@ -2673,8 +2669,8 @@ def create_coup(net, item, is_fuse=False): # bus2 = find_bus_index_in_net(pf_bus2, net) # logger.debug('switch %s connects buses <%d> and <%d>' % (name, bus1, bus2)) # -# switch_is_closed = bool(ga(item, 'on_off')) -# switch_usage = switch_types[ga(item, 'aUsage')] +# switch_is_closed = bool(item.GetAttributes('on_off')) +# switch_usage = switch_types[item.GetAttributes('aUsage')] # # cd = pp.create_switch(net, name=name, bus=bus1, element=bus2, et='b', # closed=switch_is_closed, type=switch_usage) @@ -2780,8 +2776,8 @@ def create_shunt(net, item): attr_list=['cpSite.loc_name'], attr_dict={"cimRdfId": "origin_id"}) if item.HasResults(0): - net.res_shunt.loc[sid, 'pf_p'] = ga(item, 'm:P:bus1') * multiplier - net.res_shunt.loc[sid, 'pf_q'] = ga(item, 'm:Q:bus1') * multiplier + net.res_shunt.loc[sid, 'pf_p'] = item.GetAttributes('m:P:bus1') * multiplier + net.res_shunt.loc[sid, 'pf_q'] = item.GetAttributes('m:Q:bus1') * multiplier else: net.res_shunt.loc[sid, 'pf_p'] = np.nan net.res_shunt.loc[sid, 'pf_q'] = np.nan @@ -2886,8 +2882,8 @@ def create_vac(net, item): params['name'], item.itype)) if item.HasResults(0): # -1 for 'c' results (whatever that is...) - net['res_%s' % elm].at[xid, "pf_p"] = -ga(item, 'm:P:bus1') - net['res_%s' % elm].at[xid, "pf_q"] = -ga(item, 'm:Q:bus1') + net['res_%s' % elm].at[xid, "pf_p"] = -item.GetAttributes('m:P:bus1') + net['res_%s' % elm].at[xid, "pf_q"] = -item.GetAttributes('m:Q:bus1') else: net['res_%s' % elm].at[xid, "pf_p"] = np.nan net['res_%s' % elm].at[xid, "pf_q"] = np.nan @@ -2954,8 +2950,8 @@ def _get_vsc_control_modes(item, mono=True): f" {item.loc_name} not implemented: {c_m}") if item.HasResults(0): - p_set_dc = -ga(item, f"m:P:{dc_bus_str}") - q_set_ac = -ga(item, "m:Q:busac") * scaling + p_set_dc = -item.GetAttributes(f"m:P:{dc_bus_str}") + q_set_ac = -item.GetAttributes("m:Q:busac") * scaling else: p_set_dc = -item.psetp * scaling # does not work - in PowerFactory, the P set-point relates to AC side q_set_ac = -item.qsetp * scaling @@ -3022,7 +3018,7 @@ def create_vscmono(net, item): for res_var_pp, res_var_pf in result_variables.items(): res = np.nan if item.HasResults(0): - res = ga(item, res_var_pf) + res = item.GetAttributes(res_var_pf) net.res_vsc.at[vid, res_var_pp] = -res @@ -3070,11 +3066,11 @@ def create_vsc(net, item): if item.HasResults(0): for res_var_pp, res_var_pf in result_variables.items(): - res = ga(item, res_var_pf) + res = item.GetAttributes(res_var_pf) net.res_vsc.at[vid_1, res_var_pp] = -res / 2 net.res_vsc.at[vid_2, res_var_pp] = -res / 2 - net.res_vsc.at[vid_1, "pf_p_dc_mw"] = -ga(item, "m:P:busdm") - net.res_vsc.at[vid_2, "pf_p_dc_mw"] = -ga(item, "m:P:busdp") + net.res_vsc.at[vid_1, "pf_p_dc_mw"] = -item.GetAttributes("m:P:busdm") + net.res_vsc.at[vid_2, "pf_p_dc_mw"] = -item.GetAttributes("m:P:busdp") else: net.res_vsc.loc[vid_1, ["pf_p_mw", "pf_q_mvar", "pf_p_dc_mw"]] = np.nan net.res_vsc.loc[vid_2, ["pf_p_mw", "pf_q_mvar", "pf_p_dc_mw"]] = np.nan From 26dbbb02a38a27213780dd6a097f0b1d2ed17b67 Mon Sep 17 00:00:00 2001 From: hkoertge Date: Mon, 25 Nov 2024 11:51:05 +0100 Subject: [PATCH 3/5] fixed formatting issues switched to f-strings instead of % notation added some type hints and docstrings --- .../powerfactory/pp_import_functions.py | 243 +++++++++++------- 1 file changed, 152 insertions(+), 91 deletions(-) diff --git a/pandapower/converter/powerfactory/pp_import_functions.py b/pandapower/converter/powerfactory/pp_import_functions.py index 472f6445a..6fb6747d2 100644 --- a/pandapower/converter/powerfactory/pp_import_functions.py +++ b/pandapower/converter/powerfactory/pp_import_functions.py @@ -31,10 +31,19 @@ # import network to pandapower: -def from_pf(dict_net, pv_as_slack=True, pf_variable_p_loads='plini', pf_variable_p_gen='pgini', - flag_graphics='GPS', tap_opt="nntap", export_controller=True, handle_us="Deactivate", - max_iter=None, is_unbalanced=False, create_sections=True): - global line_dict +def from_pf( + dict_net, + pv_as_slack=True, + pf_variable_p_loads='plini', + pf_variable_p_gen='pgini', + flag_graphics: Literal["GPS", "no geodata"] = 'GPS', + tap_opt="nntap", + export_controller=True, + handle_us: Literal["Deactivate", "Drop", "Nothing"] = "Deactivate", + max_iter=None, + is_unbalanced=False, + create_sections=True +): global line_dict, trafo_dict, switch_dict, bus_dict, grf_map line_dict = {} trafo_dict = {} @@ -448,12 +457,12 @@ def get_pf_bus_results(net, item, bid, is_unbalanced, system_type): if is_unbalanced: bus_type = "res_bus_3ph" result_variables = { - "pf_vm_a_pu": "m:u:A", - "pf_va_a_degree": "m:phiu:A", - "pf_vm_b_pu": "m:u:B", - "pf_va_b_degree": "m:phiu:B", - "pf_vm_c_pu": "m:u:C", - "pf_va_c_degree": "m:phiu:C", + "pf_vm_a_pu": "m:u:A", + "pf_va_a_degree": "m:phiu:A", + "pf_vm_b_pu": "m:u:B", + "pf_va_b_degree": "m:phiu:B", + "pf_vm_c_pu": "m:u:C", + "pf_va_c_degree": "m:phiu:C", } elif system_type == "ac": bus_type = "res_bus" @@ -751,13 +760,21 @@ def create_line(net, item, flag_graphics, create_sections, is_unbalanced): logger.debug('line <%s> created' % params['name']) -def point_len(p1, p2): +def point_len( + p1: tuple[Union[float, int], Union[float, int]], + p2: tuple[Union[float, int], Union[float, int]]) -> float: + """ + Calculate distance between p1 and p2 + """ x1, y1 = p1 x2, y2 = p2 return ((x1 - x2) ** 2 + (y1 - y2) ** 2) ** 0.5 -def calc_len_coords(coords): +def calc_len_coords(coords: list[tuple[Union[float, int], Union[float, int]]]) -> float: + """ + Calculate the sum of point distances in list of coords + """ tot_len = 0 for i in range(len(coords) - 1): tot_len += point_len(coords[i], coords[i + 1]) @@ -873,7 +890,7 @@ def segment_buses(net, bus1, bus2, num_sections, line_name): # , sec_len, start # split_len = 0 while m < num_sections: - bus_name = "%s (Muff %u)" % (line_name, m) + bus_name = f"{line_name} (Muff {m})" vn_kv = net.bus.at[bus1, "vn_kv"] zone = net.bus.at[bus1, "zone"] bus = pp.create_bus(net, name=bus_name, type='ls', vn_kv=vn_kv, zone=zone) @@ -1085,15 +1102,15 @@ def get_pf_line_results(net, item, lid, is_unbalanced, ac): if is_unbalanced: line_type = "res_line_3ph" result_variables = { - "pf_i_a_from_ka": "m:I:bus1:A", - "pf_i_a_to_ka": "m:I:bus2:A", - "pf_i_b_from_ka": "m:I:bus1:B", - "pf_i_b_to_ka": "m:I:bus2:B", - "pf_i_c_from_ka": "m:I:bus1:C", - "pf_i_c_to_ka": "m:I:bus2:C", - "pf_i_n_from_ka": "m:I0x3:bus1", - "pf_i_n_to_ka": "m:I0x3:bus2", - "pf_loading_percent": "c:loading", + "pf_i_a_from_ka": "m:I:bus1:A", + "pf_i_a_to_ka": "m:I:bus2:A", + "pf_i_b_from_ka": "m:I:bus1:B", + "pf_i_b_to_ka": "m:I:bus2:B", + "pf_i_c_from_ka": "m:I:bus1:C", + "pf_i_c_to_ka": "m:I:bus2:C", + "pf_i_n_from_ka": "m:I0x3:bus1", + "pf_i_n_to_ka": "m:I0x3:bus2", + "pf_loading_percent": "c:loading", } elif ac: line_type = "res_line" @@ -1133,14 +1150,14 @@ def create_line_type(net, item, cable_in_air=False): type_data = { "r_ohm_per_km": item.rline, "x_ohm_per_km": item.xline, - "c_nf_per_km": item.cline*item.frnom/50 * 1e3, # internal unit for C in PF is uF + "c_nf_per_km": item.cline * item.frnom / 50 * 1e3, # internal unit for C in PF is uF "q_mm2": item.qurs, "max_i_ka": max_i_ka if max_i_ka != 0 else 1e-3, "endtemp_degree": item.rtemp, "type": line_or_cable, "r0_ohm_per_km": item.rline0, "x0_ohm_per_km": item.xline0, - "c0_nf_per_km": item.cline0*item.frnom/50 * 1e3, # internal unit for C in PF is uF + "c0_nf_per_km": item.cline0 * item.frnom / 50 * 1e3, # internal unit for C in PF is uF "alpha": item.alpha } pp.create_std_type(net, type_data, name, "line") @@ -1258,12 +1275,12 @@ def get_pf_ext_grid_results(net, item, xid, is_unbalanced): if is_unbalanced: ext_grid_type = "res_ext_grid_3ph" result_variables = { - "pf_p_a": "m:P:bus1:A", - "pf_q_a": "m:Q:bus1:A", - "pf_p_b": "m:P:bus1:B", - "pf_q_b": "m:Q:bus1:B", - "pf_p_c": "m:P:bus1:C", - "pf_q_c": "m:Q:bus1:C", + "pf_p_a": "m:P:bus1:A", + "pf_q_a": "m:Q:bus1:A", + "pf_p_b": "m:P:bus1:B", + "pf_q_b": "m:Q:bus1:B", + "pf_p_c": "m:P:bus1:C", + "pf_q_c": "m:Q:bus1:C", } else: ext_grid_type = "res_ext_grid" @@ -1894,7 +1911,7 @@ def create_sgen_genstat(net, item, pv_as_slack, pf_variable_p_gen, dict_net, is_ return params.update(ask(item, pf_variable_p_gen, 'p_mw', 'q_mvar', 'sn_mva')) - logger.debug('genstat parameters: ' % params) + logger.debug(f'genstat parameters: {params}') params.in_service = monopolar_in_service(item) @@ -2371,16 +2388,31 @@ def create_trafo(net, item, export_controller=True, tap_opt="nntap", is_unbalanc logger.debug('created trafo at index <%d>' % tid) else: logger.info("Create Trafo 3ph") - tid = pp.create_transformer_from_parameters(net, hv_bus=bus1, lv_bus=bus2, name=name, - tap_pos=tap_pos, - in_service=in_service, parallel=item.ntnum, df=item.ratfac, - sn_mva=pf_type.strn, vn_hv_kv=pf_type.utrn_h, vn_lv_kv=pf_type.utrn_l, - vk_percent=pf_type.uktr, vkr_percent=pf_type.uktrr, - pfe_kw=pf_type.pfe, i0_percent=pf_type.curmg, - vector_group=pf_type.vecgrp[:-1], vk0_percent=pf_type.uk0tr, - vkr0_percent=pf_type.ur0tr, mag0_percent=pf_type.zx0hl_n, - mag0_rx=pf_type.rtox0_n, si0_hv_partial=pf_type.zx0hl_h, - shift_degree=pf_type.nt2ag * 30, tap2_pos=tap_pos2) + tid = pp.create_transformer_from_parameters( + net, + hv_bus=bus1, + lv_bus=bus2, + name=name, + tap_pos=tap_pos, + in_service=in_service, + parallel=item.ntnum, + df=item.ratfac, + sn_mva=pf_type.strn, + vn_hv_kv=pf_type.utrn_h, + vn_lv_kv=pf_type.utrn_l, + vk_percent=pf_type.uktr, + vkr_percent=pf_type.uktrr, + pfe_kw=pf_type.pfe, + i0_percent=pf_type.curmg, + vector_group=pf_type.vecgrp[:-1], + vk0_percent=pf_type.uk0tr, + vkr0_percent=pf_type.ur0tr, + mag0_percent=pf_type.zx0hl_n, + mag0_rx=pf_type.rtox0_n, + si0_hv_partial=pf_type.zx0hl_h, + shift_degree=pf_type.nt2ag * 30, + tap2_pos=tap_pos2 + ) trafo_dict[item] = tid # add value for voltage setpoint @@ -2452,15 +2484,15 @@ def get_pf_trafo_results(net, item, tid, is_unbalanced): if is_unbalanced: trafo_type = "res_trafo_3ph" result_variables = { - "pf_i_a_hv_ka": "m:I:bushv:A", - "pf_i_a_lv_ka": "m:I:buslv:A", - "pf_i_b_hv_ka": "m:I:bushv:B", - "pf_i_b_lv_ka": "m:I:buslv:B", - "pf_i_c_hv_ka": "m:I:bushv:C", - "pf_i_c_lv_ka": "m:I:buslv:C", - "pf_i_n_hv_ka": "m:I0x3:bushv", - "pf_i_n_lv_ka": "m:I0x3:buslv", - "pf_loading_percent": "c:loading", + "pf_i_a_hv_ka": "m:I:bushv:A", + "pf_i_a_lv_ka": "m:I:buslv:A", + "pf_i_b_hv_ka": "m:I:bushv:B", + "pf_i_b_lv_ka": "m:I:buslv:B", + "pf_i_c_hv_ka": "m:I:bushv:C", + "pf_i_c_lv_ka": "m:I:buslv:C", + "pf_i_n_hv_ka": "m:I0x3:bushv", + "pf_i_n_lv_ka": "m:I0x3:buslv", + "pf_loading_percent": "c:loading", } else: trafo_type = "res_trafo" @@ -2912,6 +2944,7 @@ def create_sind(net, item): logger.debug('created series reactor %s as per unit impedance at index %d' % (net.impedance.at[sind, 'name'], sind)) + def create_scap(net, item): # series capacitor is modelled as per-unit impedance, values in Ohm are calculated into values in # per unit at creation @@ -2921,11 +2954,11 @@ def create_scap(net, item): logger.error("Cannot add Scap '%s': not connected" % item.loc_name) return - if (item.gcap==0) or (item.bcap==0): + if (item.gcap == 0) or (item.bcap == 0): logger.info('not creating series capacitor for %s' % item.loc_name) else: - r_ohm = item.gcap/(item.gcap**2 + item.bcap**2) - x_ohm = -item.bcap/(item.gcap**2 + item.bcap**2) + r_ohm = item.gcap / (item.gcap ** 2 + item.bcap ** 2) + x_ohm = -item.bcap / (item.gcap ** 2 + item.bcap ** 2) scap = pp.create_series_reactor_as_impedance(net, from_bus=bus1, to_bus=bus2, r_ohm=r_ohm, x_ohm=x_ohm, sn_mva=item.Sn, name=item.loc_name, @@ -2971,7 +3004,6 @@ def _get_vsc_control_modes(item, mono=True): def create_vscmono(net, item): - (bus, bus_dc), _ = get_connection_nodes(net, item, 2) sn_mva = item.Snom @@ -3006,7 +3038,9 @@ def create_vscmono(net, item): } if params["r_dc_ohm"] == 0: - logger.warning(f"VSCmono element {params['name']} has no DC resistive loss factor - power flow will not converge!") + logger.warning( + f"VSCmono element {params['name']} has no DC resistive loss factor - power flow will not converge!" + ) vid = pp.create_vsc(net, **params) logger.debug(f'created VSC {vid} for vscmono {item.loc_name}') @@ -3076,12 +3110,10 @@ def create_vsc(net, item): net.res_vsc.loc[vid_2, ["pf_p_mw", "pf_q_mvar", "pf_p_dc_mw"]] = np.nan - def create_stactrl(net, item): stactrl_in_service = True if item.outserv: logger.info(f"Station controller {item.loc_name} is out of service") - stactrl_in_service = False return machines = [m for m in item.psym if m is not None] @@ -3237,15 +3269,15 @@ def create_stactrl(net, item): if not has_path and not control_mode == 0 and not item.i_droop: return - if control_mode == 0: #### VOLTAGE CONTROL + if control_mode == 0: # VOLTAGE CONTROL # controlled_node = item.rembar controlled_node = item.cpCtrlNode bus = bus_dict[controlled_node] # controlled node - if item.uset_mode == 0: #### Station controller + if item.uset_mode == 0: # Station controller v_setpoint_pu = item.usetp else: - v_setpoint_pu = controlled_node.vtarget #### Bus target voltage + v_setpoint_pu = controlled_node.vtarget # Bus target voltage if item.i_droop: # Enable Droop bsc = pp.control.BinarySearchControl(net, ctrl_in_service=stactrl_in_service, @@ -3274,28 +3306,49 @@ def create_stactrl(net, item): # q_control_mode = item.qu_char # 0: "Const Q", 1: "Q(V) Characteristic", 2: "Q(P) Characteristic" # q_control_terminal = q_control_cubicle.cterm # terminal of the cubicle if item.qu_char == 0: - pp.control.BinarySearchControl(net, ctrl_in_service=stactrl_in_service, - output_element=gen_element, output_variable="q_mvar", - output_element_index=gen_element_index, - output_element_in_service=gen_element_in_service, - input_element=res_element_table, - output_values_distribution=distribution, damping_factor=0.9, - input_variable=variable, input_element_index=res_element_index, - set_point=item.qsetp, voltage_ctrl=False, tol=1e-6) + pp.control.BinarySearchControl( + net, ctrl_in_service=stactrl_in_service, + output_element=gen_element, + output_variable="q_mvar", + output_element_index=gen_element_index, + output_element_in_service=gen_element_in_service, + input_element=res_element_table, + output_values_distribution=distribution, + damping_factor=0.9, + input_variable=variable, + input_element_index=res_element_index, + set_point=item.qsetp, + voltage_ctrl=False, tol=1e-6 + ) elif item.qu_char == 1: controlled_node = item.refbar bus = bus_dict[controlled_node] # controlled node - bsc = pp.control.BinarySearchControl(net, ctrl_in_service=stactrl_in_service, - output_element=gen_element, output_variable="q_mvar", - output_element_index=gen_element_index, - output_element_in_service=gen_element_in_service, - input_element=res_element_table, - output_values_distribution=distribution, damping_factor=0.9, - input_variable=variable, input_element_index=res_element_index, - set_point=item.qsetp, voltage_ctrl=False, bus_idx=bus, tol=1e-6) - pp.control.DroopControl(net, q_droop_mvar=item.Srated * 100 / item.ddroop, bus_idx=bus, - vm_set_pu=item.udeadbup, vm_set_ub=item.udeadbup, vm_set_lb=item.udeadblow, - controller_idx=bsc.index, voltage_ctrl=False) + bsc = pp.control.BinarySearchControl( + net, ctrl_in_service=stactrl_in_service, + output_element=gen_element, + output_variable="q_mvar", + output_element_index=gen_element_index, + output_element_in_service=gen_element_in_service, + input_element=res_element_table, + output_values_distribution=distribution, + damping_factor=0.9, + input_variable=variable, + input_element_index=res_element_index, + set_point=item.qsetp, + voltage_ctrl=False, + bus_idx=bus, + tol=1e-6 + ) + pp.control.DroopControl( + net, + q_droop_mvar=item.Srated * 100 / item.ddroop, + bus_idx=bus, + vm_set_pu=item.udeadbup, + vm_set_ub=item.udeadbup, + vm_set_lb=item.udeadblow, + controller_idx=bsc.index, + voltage_ctrl=False + ) else: raise NotImplementedError else: @@ -3322,10 +3375,17 @@ def split_line_at_length(net, line, length_pos): std_type = net.line.at[line, 'std_type'] name = net.line.at[line, 'name'] - new_line = pp.create_line(net, from_bus=bus, to_bus=bus2, length_km=new_length, - std_type=std_type, name=name, df=net.line.at[line, 'df'], - parallel=net.line.at[line, 'parallel'], - in_service=net.line.at[line, 'in_service']) + new_line = pp.create_line( + net, + from_bus=bus, + to_bus=bus2, + length_km=new_length, + std_type=std_type, + name=name, + df=net.line.at[line, 'df'], + parallel=net.line.at[line, 'parallel'], + in_service=net.line.at[line, 'in_service'] + ) if 'max_loading_percent' in net.line.columns: net.line.loc[new_line, 'max_loading_percent'] = net.line.at[line, 'max_loading_percent'] @@ -3334,8 +3394,10 @@ def split_line_at_length(net, line, length_pos): coords = geojson.utils.coords(geojson.loads(net.line.loc[line, 'geo'])) scaling_factor = old_length / calc_len_coords(coords) - sec_coords_a = get_section_coords(coords, sec_len=length_pos, start_len=0., - scale_factor=scaling_factor) + sec_coords_a = get_section_coords(coords, sec_len=length_pos, start_len=0., scale_factor=scaling_factor) + sec_coords_b = get_section_coords( + coords, sec_len=new_length, start_len=length_pos, scale_factor=scaling_factor + ) net.line.loc[line, 'geo'] = geojson.dumps(geojson.LineString(sec_coords_a)) net.line.loc[new_line, 'geo'] = geojson.dumps(geojson.LineString(sec_coords_b)) @@ -3349,16 +3411,14 @@ def get_lodlvp_length_pos(line_item, lod_item): sections = line_item.GetContents('*.ElmLnesec') if len(sections) > 0: sections.sort(lambda x: x.index) - sections_start = [s.rellen for s in sections] sections_end = [s.rellen + s.dline for s in sections] else: - sections_start = [0] sections_end = [line_item.dline] loads = line_item.GetContents('*.ElmLodlvp') if len(loads) > 0: loads.sort(lambda x: x.rellen) - loads_start = [l.rellen for l in loads] + loads_start = [load.rellen for load in loads] else: loads_start = [0] @@ -3403,7 +3463,8 @@ def split_line(net, line_idx, pos_at_line, line_item): return bus_j elif (pos_at_line - line_length) > tol: raise ValueError( - 'Position at line is higher than the line length itself! Line length: %.7f, position at line: %.7f (line: \n%s)' % ( + 'Position at line is higher than the line length itself!\ + Line length: %.7f, position at line: %.7f (line: \n%s)' % ( # line_length, pos_at_line, line_item.loc_name)) line_length, pos_at_line, net.line.loc[line_dict[line_item]])) else: @@ -3661,7 +3722,7 @@ def split_all_lines(net, lvp_dict): # val = [(92, 1, 0.025, 0.1), (91, 2, 0.031, 0.2), (90, 2, 0.032, 0.3)] for load_item, pos_at_line, (p, q) in val: logger.debug(load_item) - ## calculate at once and then read from dict - not good approach! don't do it + # calculate at once and then read from dict - not good approach! don't do it # section, pos_at_sec = get_pos_at_sec(net, net_dgs, lvp_dict, line, load_idx) # section = pas[load_idx]['section'] # pos_at_sec = pas[load_idx]['pos'] @@ -3683,7 +3744,7 @@ def split_all_lines(net, lvp_dict): net.res_load.at[new_load, 'pf_p'] = p net.res_load.at[new_load, 'pf_q'] = q else: - # const I not implemented for sgen... + # const I is not implemented for sgen new_load = pp.create_sgen(net, new_bus, name=load_item.loc_name, p_mw=p, q_mvar=q) logger.debug('created sgen %s' % new_load) net.res_sgen.at[new_load, 'pf_p'] = p From ef62aa295f616ce51fe4f8654f0c0196e50663e2 Mon Sep 17 00:00:00 2001 From: hkoertge Date: Mon, 25 Nov 2024 11:52:50 +0100 Subject: [PATCH 4/5] rewrote calculation to remove redundant calls renamed vars to small letters as per python naming convention --- .../powerfactory/pp_import_functions.py | 109 ++++++++---------- 1 file changed, 45 insertions(+), 64 deletions(-) diff --git a/pandapower/converter/powerfactory/pp_import_functions.py b/pandapower/converter/powerfactory/pp_import_functions.py index 6fb6747d2..e6297797c 100644 --- a/pandapower/converter/powerfactory/pp_import_functions.py +++ b/pandapower/converter/powerfactory/pp_import_functions.py @@ -2717,6 +2717,12 @@ def create_shunt(net, item): logger.error("Cannot add Shunt '%s': not connected" % item.loc_name) return + def calc_p_mw_and_q_mvar(r: float, x: float) -> tuple[float, float]: + if r == 0 and x == 0: + return 0, 0 + divisor: float = (r ** 2 + x ** 2) + return (item.ushnm ** 2 * r) / divisor * multiplier, (item.ushnm ** 2 * x) / divisor * multiplier + multiplier = get_power_multiplier(item, 'Qact') bus, _ = get_connection_nodes(net, item, 1) params = { @@ -2729,83 +2735,58 @@ def create_shunt(net, item): 'max_step': item.ncapx } print(item.loc_name) + r_val: float = .0 + x_val: float = .0 if item.shtype == 0: # Shunt is a R-L-C element - - R = item.rrea - X = -1e6 / item.bcap + item.xrea - if R == 0 and X == 0: #TODO put this into one function - p_mw = 0 - params['q_mvar'] = 0 - else: - p_mw = (item.ushnm ** 2 * R) / (R ** 2 + X ** 2) * multiplier - params['q_mvar'] = (item.ushnm ** 2 * X) / (R ** 2 + X ** 2) * multiplier - sid = pp.create_shunt(net, p_mw=p_mw, **params) + r_val = item.rrea + x_val = -1e6 / item.bcap + item.xrea elif item.shtype == 1: # Shunt is an R-L element - - R = item.rrea - X = item.xrea - if R == 0 and X == 0: #TODO put this into one function - p_mw = 0 - params['q_mvar'] = 0 - else: - p_mw = (item.ushnm ** 2 * R) / (R ** 2 + X ** 2) * multiplier - params['q_mvar'] = (item.ushnm ** 2 * X) / (R ** 2 + X ** 2) * multiplier - sid = pp.create_shunt(net, p_mw=p_mw, **params) + r_val = item.rrea + x_val = item.xrea elif item.shtype == 2: # Shunt is a capacitor bank - B = item.bcap*1e-6 - G = item.gparac*1e-6 - - R = G/(G**2 + B**2) - X = -B/(G**2 + B**2) - if R == 0 and X == 0: #TODO put this into one function - p_mw = 0 - params['q_mvar'] = 0 - else: - p_mw = (item.ushnm ** 2 * R) / (R ** 2 + X ** 2) * multiplier - params['q_mvar'] = (item.ushnm ** 2 * X) / (R ** 2 + X ** 2) * multiplier - sid = pp.create_shunt(net, p_mw=p_mw, **params) + b = item.bcap*1e-6 + g = item.gparac*1e-6 + + r_val = g / (g ** 2 + b ** 2) + x_val = -b / (g ** 2 + b ** 2) elif item.shtype == 3: # Shunt is a R-L-C, Rp element + rp = item.rpara + rs = item.rrea + xl = item.xrea + bc = -item.bcap * 1e-6 - Rp = item.rpara - Rs = item.rrea - Xl = item.xrea - Bc = -item.bcap * 1e-6 - - R = Rp * (Rp * Rs + Rs ** 2 + Xl ** 2) / ((Rp + Rs) ** 2 + Xl ** 2) - X = 1 / Bc + (Xl * Rp ** 2) / ((Rp + Rs) ** 2 + Xl ** 2) - if R == 0 and X == 0: #TODO put this into one function - p_mw = 0 - params['q_mvar'] = 0 - else: - p_mw = (item.ushnm ** 2 * R) / (R ** 2 + X ** 2) * multiplier - params['q_mvar'] = (item.ushnm ** 2 * X) / (R ** 2 + X ** 2) * multiplier - sid = pp.create_shunt(net, p_mw=p_mw, **params) + r_val = rp * (rp * rs + rs ** 2 + xl ** 2) / ((rp + rs) ** 2 + xl ** 2) + x_val = 1 / bc + (xl * rp ** 2) / ((rp + rs) ** 2 + xl ** 2) elif item.shtype == 4: # Shunt is a R-L-C1-C2, Rp element - - Rp = item.rpara - Rs = item.rrea - Xl = item.xrea - B1 = 2 * np.pi * 50 * item.c1 * 1e-6 - B2 = 2 * np.pi * 50 * item.c2 * 1e-6 - - Z = Rp * (Rs + 1j * (Xl - 1 / B1)) / (Rp + Rs + 1j * (Xl - 1 / B1)) - 1j / B2 - R = np.real(Z) - X = np.imag(Z) - if R == 0 and X == 0: #TODO put this into one function - p_mw = 0 - params['q_mvar'] = 0 - else: - p_mw = (item.ushnm ** 2 * R) / (R ** 2 + X ** 2) * multiplier - params['q_mvar'] = (item.ushnm ** 2 * X) / (R ** 2 + X ** 2) * multiplier + rp = item.rpara + rs = item.rrea + xl = item.xrea + b1 = 2 * np.pi * 50 * item.c1 * 1e-6 + b2 = 2 * np.pi * 50 * item.c2 * 1e-6 + + z = rp * (rs + 1j * (xl - 1 / b1)) / (rp + rs + 1j * (xl - 1 / b1)) - 1j / b2 + r_val = np.real(z) + x_val = np.imag(z) + + if 0 <= item.shtype <= 4: + p_mw, params['q_mvar'] = calc_p_mw_and_q_mvar(r_val, x_val) sid = pp.create_shunt(net, p_mw=p_mw, **params) - add_additional_attributes(item, net, element='shunt', element_id=sid, - attr_list=['cpSite.loc_name'], attr_dict={"cimRdfId": "origin_id"}) + add_additional_attributes( + item, + net, + element='shunt', + element_id=sid, + attr_list=['cpSite.loc_name'], + attr_dict={"cimRdfId": "origin_id"} + ) + else: + raise AttributeError(f"Shunt type {item.shtype} not valid: {item}") if item.HasResults(0): net.res_shunt.loc[sid, 'pf_p'] = item.GetAttributes('m:P:bus1') * multiplier From ce45bd044736fad21f204e944813c9a34be53aee Mon Sep 17 00:00:00 2001 From: hkoertge Date: Tue, 26 Nov 2024 11:35:36 +0100 Subject: [PATCH 5/5] fixed us of net.bus_geodata and net.line_geodata reformatted file simplified some functions renamed some capitalized variables --- pandapower/protection/utility_functions.py | 328 ++++++++------------- 1 file changed, 120 insertions(+), 208 deletions(-) diff --git a/pandapower/protection/utility_functions.py b/pandapower/protection/utility_functions.py index 7ae26364e..b1f3ad034 100644 --- a/pandapower/protection/utility_functions.py +++ b/pandapower/protection/utility_functions.py @@ -1,8 +1,9 @@ # This function includes various function used for general functionalities such as plotting, grid search import copy -from typing import overload, List, Tuple +from typing import List, Tuple +from matplotlib.collections import PatchCollection from typing_extensions import deprecated import geojson @@ -16,6 +17,7 @@ import pandapower as pp import pandapower.plotting as plot +from pandapower import pandapowerNet from pandapower.topology.create_graph import create_nxgraph import warnings @@ -122,7 +124,8 @@ def create_sc_bus(net_copy, sc_line_id, sc_fraction): x1, y1 = _get_coords_from_bus_idx(net, aux_line.from_bus)[0] x2, y2 = _get_coords_from_bus_idx(net, aux_line.to_bus)[0] - net.bus.geo.at[max_idx_bus + 1] = geojson.dumps(geojson.Point((sc_fraction * (x2 - x1) + x1, sc_fraction * (y2 - y1) + y1)), sort_keys=True) + net.bus.geo.at[max_idx_bus + 1] = geojson.dumps( + geojson.Point((sc_fraction * (x2 - x1) + x1, sc_fraction * (y2 - y1) + y1)), sort_keys=True) return net @@ -145,21 +148,9 @@ def calc_faults_at_full_line(net, line, location_step_size=0.01, start_location= return fault_currents -def get_line_idx(net, switch_id): - # get the line id from swithc id - line_idx = net.switch.element.at[switch_id] - return line_idx - - -def get_bus_idx(net, switch_id): - # get the bus id using switch if - bus_idx = net.switch.bus.at[switch_id] - return bus_idx - - def get_opposite_side_bus_from_switch(net, switch_id): # get the frm and to bus of switch - line_idx = get_line_idx(net, switch_id) + line_idx = net.switch.element.at[switch_id] is_from_bus = get_from_bus_info_switch(net, switch_id) if is_from_bus: @@ -184,33 +175,15 @@ def get_opposite_side_bus_from_bus_line(net, bus_idx, line_idx): def get_from_bus_info_switch(net, switch_id): # get the from bus of given switch id - bus_idx = get_bus_idx(net, switch_id) - line_idx = get_line_idx(net, switch_id) - - for line in net.line.index: # can be written better - if line == line_idx: - if bus_idx == net.line.from_bus.at[line_idx]: # asks if switch is at from_bus - is_from_bus = True - # sc_fraction = 0.95 - else: # else it is at to_bus - is_from_bus = False - # sc_fraction = 0.05 + bus_idx = net.switch.bus.at[switch_id] + line_idx = net.switch.element.at[switch_id] - return is_from_bus + return bus_idx == net.line.from_bus.at[line_idx] def get_from_bus_info_bus_line(net, bus_idx, line_idx): # get bus nfo of given line - for line in net.line.index: # can be written better - if line == line_idx: - if bus_idx == net.line.from_bus.at[line_idx]: # asks if switch is at from_bus - is_from_bus = True - # sc_fraction = 0.95 - else: # else it is at to_bus - is_from_bus = False - # sc_fraction = 0.05 - - return is_from_bus + return bus_idx == net.line.from_bus.at[line_idx] def get_line_impedance(net, line_idx): @@ -218,23 +191,19 @@ def get_line_impedance(net, line_idx): line_length = net.line.length_km.at[line_idx] line_r_per_km = net.line.r_ohm_per_km.at[line_idx] line_x_per_km = net.line.x_ohm_per_km.at[line_idx] - Z_line = complex(line_r_per_km * line_length, line_x_per_km * line_length) # Z = R + jX - return Z_line + z_line = complex(line_r_per_km * line_length, line_x_per_km * line_length) # Z = R + jX + return z_line -def get_lowest_impedance_line(net, lines): +def get_lowest_impedance_line(net: pandapowerNet, lines): # get the low impedenceline - i = 0 + min_imp_line = None + min_impedance = float('inf') for line in lines: impedance = abs(get_line_impedance(net, line)) - if i == 0: - min_imp_line = line + if impedance < min_impedance: min_impedance = impedance - else: - if impedance < min_impedance: - min_impedance = impedance - min_imp_line = line - i += 1 + min_imp_line = line return min_imp_line @@ -260,6 +229,42 @@ def fuse_bus_switches(net, bus_switches): return net +def get_fault_annotation(net: pandapowerNet, fault_current: float = .0, font_size_bus: float = 0.06) -> PatchCollection: + max_bus_idx = max(net.bus.dropna(subset=['geo']).index) + fault_text = f'\tI_sc = {fault_current}kA' + + fault_geo_x_y: Tuple[float, float] = next(geojson.utils.coords(geojson.loads(net.bus.geo.at[max_bus_idx]))) + fault_geo_x_y = (fault_geo_x_y[0], fault_geo_x_y[1] - font_size_bus + 0.02) + + # list of new geo data for line (half position of switch) + fault_annotate: PatchCollection = plot.create_annotation_collection( + texts=[fault_text], + coords=[fault_geo_x_y], + size=font_size_bus, + prop=None + ) + + return fault_annotate + + +def get_sc_location_annotation(net: pandapowerNet, sc_location: float, font_size_bus: float = 0.06) -> PatchCollection: + max_bus_idx = max(net.bus.dropna(subset=['geo']).index) + sc_text = f'\tsc_location: {sc_location * 100}%' + + # list of new geo data for line (middle of position of switch) + sc_geo_x_y = next(geojson.utils.coords(geojson.loads(net.bus.geo.at[max_bus_idx]))) + sc_geo_x_y = (sc_geo_x_y[0], sc_geo_x_y[1] + 0.02) + + sc_annotate: PatchCollection = plot.create_annotation_collection( + texts=[sc_text], + coords=[sc_geo_x_y], + size=font_size_bus, + prop=None + ) + + return sc_annotate + + def plot_tripped_grid(net, trip_decisions, sc_location, bus_size=0.055, plot_annotations=True): # plot the tripped grid of net_sc if MPLCURSORS_INSTALLED: @@ -348,6 +353,8 @@ def plot_tripped_grid(net, trip_decisions, sc_location, bus_size=0.055, plot_ann line_text = [] line_geodata = [] + fault_current: float = .0 + # for Switches in trip_decisions: for line in net.line.index: @@ -362,20 +369,22 @@ def plot_tripped_grid(net, trip_decisions, sc_location, bus_size=0.055, plot_ann respect_in_service=False) bus_list = list(get_bus_index) + bus_coords: List[Tuple[float, float]] = [ + geojson.utils.coords(geojson.loads(net.bus.geo.at[bus])) for bus in bus_list + ] # TODO: # place annotations on middle of the line - line_geo_x = (net.bus_geodata.iloc[bus_list[0]].x + net.bus_geodata.iloc[bus_list[1]].x) / 2 - - line_geo_y = ((net.bus_geodata.iloc[bus_list[0]].y + net.bus_geodata.iloc[bus_list[1]].y) / 2) + 0.05 + line_geo_x = (bus_coords[0][0] + bus_coords[1][0]) / 2 + line_geo_y = ((bus_coords[0][1] + bus_coords[1][1]) / 2) + 0.05 line_geo_x_y = [line_geo_x, line_geo_y] # list of new geo data for line (half position of switch) line_geodata.append(tuple(line_geo_x_y)) - fault_current = round(net.res_bus_sc['ikss_ka'].at[max(net.bus.index)], - 2) # round(Switches['Fault Current'],2) + fault_current = round(net.res_bus_sc['ikss_ka'].at[max(net.bus.index)], 2) + # round(Switches['Fault Current'],2) line_text.append(text_line) @@ -385,67 +394,27 @@ def plot_tripped_grid(net, trip_decisions, sc_location, bus_size=0.055, plot_ann # Bus Annotatations bus_text = [] - for i in net.bus_geodata.index: + for i in net.bus.geo.dropna().index: bus_texts = 'bus_' + str(i) bus_text.append(bus_texts) bus_text = bus_text[:-1] - bus_geodata = net.bus_geodata[['x', 'y']] + bus_geodata = net.bus.geo.dropna().apply(geojson.loads).apply(geojson.utils.coords).apply(next).to_list() # placing bus - bus_geodata['x'] = bus_geodata['x'] - 0.11 - bus_geodata['y'] = bus_geodata['y'] + 0.095 + bus_index = [(x[0] - 0.11, x[1] + 0.095) for x in bus_geodata] # TODO: - bus_index = [tuple(x) for x in bus_geodata.to_numpy()] bus_annotate = plot.create_annotation_collection(texts=bus_text, coords=bus_index, size=0.06, prop=None) collection.append(bus_annotate) # Short circuit annotations - fault_geodata = [] - - fault_text = [] - - fault_texts = ' I_sc = ' + str(fault_current) + 'kA' - - font_size_bus = 0.06 # font size of fault location text - - fault_geo_x = net.bus_geodata.iloc[max(net.bus_geodata.index)][0] - fault_geo_y = net.bus_geodata.iloc[max(net.bus_geodata.index)][1] - font_size_bus + 0.02 - - fault_geo_x_y = [fault_geo_x, fault_geo_y] - - # list of new geo data for line (half position of switch) - fault_geodata.append(tuple(fault_geo_x_y)) - - fault_text.append(fault_texts) - fault_annotate = plot.create_annotation_collection(texts=fault_text, coords=fault_geodata, size=0.06, prop=None) - - collection.append(fault_annotate) + collection.append(get_fault_annotation(net, fault_current)) # sc_location annotation - sc_text = [] - sc_geodata = [] - - sc_texts = ' sc_location: ' + str(sc_location * 100) + '%' - - # font_size_bus=0.06 # font size of sc location - - sc_geo_x = net.bus_geodata.iloc[max(net.bus_geodata.index)][0] - - sc_geo_y = net.bus_geodata.iloc[max(net.bus_geodata.index)][1] + 0.02 - - sc_geo_x_y = [sc_geo_x, sc_geo_y] - - # list of new geo data for line (middle of position of switch) - sc_geodata.append(tuple(sc_geo_x_y)) - - sc_text.append(sc_texts) - sc_annotate = plot.create_annotation_collection(texts=sc_text, coords=sc_geodata, size=0.06, prop=None) - - collection.append(sc_annotate) + collection.append(get_sc_location_annotation(net, sc_location)) # switch annotations # from pandapower.protection.implemeutility_functions import switch_geodata @@ -595,7 +564,9 @@ def plot_tripped_grid_protection_device(net, trip_decisions, sc_location, sc_bus bus_list = list(get_bus_index) # place annotations on middle of the line - bus_coords = list(zip(*net.bus.geo.iloc[bus_list[0:2]].apply(geojson.loads).apply(geojson.utils.coords).apply(next).to_list())) + bus_coords = list( + zip(*net.bus.geo.iloc[bus_list[0:2]].apply(geojson.loads).apply(geojson.utils.coords).apply( + next).to_list())) line_geo_x_y = [sum(x) / 2 for x in bus_coords] line_geo_x_y[1] += 0.05 @@ -610,7 +581,7 @@ def plot_tripped_grid_protection_device(net, trip_decisions, sc_location, sc_bus line_annotate = plot.create_annotation_collection(texts=line_text, coords=line_geodata, size=0.06, prop=None) collection.append(line_annotate) - # Bus Annotatations + # Bus Annotations bus_text = [] for i in net.bus.index: bus_texts = f'bus_{i}' @@ -626,42 +597,13 @@ def plot_tripped_grid_protection_device(net, trip_decisions, sc_location, sc_bus bus_annotate = plot.create_annotation_collection(texts=bus_text, coords=bus_geodata, size=0.06, prop=None) collection.append(bus_annotate) - font_size_bus = 0.06 # font size of fault location text max_bus_idx = max(net.bus.dropna(subset=['geo']).index) # Short circuit annotations - fault_geodata = [] - fault_text = [] - fault_texts = f'\tI_sc = {fault_current}kA' - - fault_geo_x_y = next(geojson.utils.coords(geojson.loads(net.bus.geo.at[max_bus_idx]))) - fault_geo_x_y = (fault_geo_x_y[0], fault_geo_x_y[1] - font_size_bus + 0.02) - - # list of new geo data for line (half position of switch) - fault_geodata.append(fault_geo_x_y) - - fault_text.append(fault_texts) - fault_annotate = plot.create_annotation_collection(texts=fault_text, coords=fault_geodata, size=0.06, prop=None) - - collection.append(fault_annotate) + collection.append(get_fault_annotation(net, fault_current)) # sc_location annotation - sc_text = [] - sc_geodata = [] - sc_texts = f'\tsc_location: {sc_location * 100}%' - - # font_size_bus=0.06 # font size of sc location - - sc_geo_x_y = next(geojson.utils.coords(geojson.loads(net.bus.geo.at[max_bus_idx]))) - sc_geo_x_y = (sc_geo_x_y[0], sc_geo_x_y[1] + 0.02) - - # list of new geo data for line (middle of position of switch) - sc_geodata.append(sc_geo_x_y) - - sc_text.append(sc_texts) - sc_annotate = plot.create_annotation_collection(texts=sc_text, coords=sc_geodata, size=0.06, prop=None) - - collection.append(sc_annotate) + collection.append(get_sc_location_annotation(net, sc_location)) # switch annotations # from pandapower.protection.utility_functions import switch_geodata @@ -699,7 +641,7 @@ def get_connected_lines(net, bus_idx): # first one. E.g. the from_bus given the to_bus of a line. @deprecated("Use pandapower.next_bus(net, bus, element_id instead!") def next_buses(net, bus, element_id): - return pp.next_bus(net,bus,element_id) + return pp.next_bus(net, bus, element_id) # get the connected bus listr from start to end bus @@ -896,89 +838,59 @@ def bus_path_from_to_bus(net, radial_start_bus, loop_start_bus, end_bus): return bus_path -def get_switches_in_path(net, pathes): +def get_switches_in_path(net, paths): """function calculate the switching times from the bus path""" - Lines_in_path = [] + lines_in_path: List[List] = [] - for path in pathes: - Lines_at_path = [] + for path in paths: + lines_at_path: set = set() for bus in path: - Lines_at_paths = [] - lines_at_bus = pp.get_connected_elements(net, "l", bus) - - for line in lines_at_bus: - Lines_at_path.append(line) - - for Line1 in Lines_at_path: - if net.line.from_bus[Line1] in path: - if net.line.to_bus[Line1] in path: - if Line1 not in Lines_at_paths: - Lines_at_paths.append(Line1) - - Lines_in_path.append(Lines_at_paths) + lines_at_path.update(pp.get_connected_elements(net, "l", bus)) - switches_in_net = net.switch.index - switches_in_path = [] + lines_at_paths = [ + line for line in lines_at_path + if net.line.from_bus[line] in path and net.line.to_bus[line] in path + ] - for Linepath in Lines_in_path: - switches_at_path = [] + lines_in_path.append(lines_at_paths) - for Line in Linepath: - - for switch in switches_in_net: - if net.switch.et[switch] == "l": - if net.switch.element[switch] == Line: - switches_at_path.append(switch) - switches_in_path.append(switches_at_path) + switches_in_path = [ + [net.switch[(net.switch['et'] == 'l') & (net.switch['element'] == line)].index for line in line_path] + for line_path in lines_in_path + ] return switches_in_path -def get_vi_angle(net, switch_id, powerflow_results=False): +def get_vi_angle(net: pandapowerNet, switch_id: int, **kwargs) -> float: """calculate the angle between voltage and current with reference to voltage""" - pp.runpp(net) - line_idx = get_line_idx(net, switch_id) - bus_idx = get_bus_idx(net, switch_id) - - if powerflow_results: - - if get_from_bus_info_switch(net, switch_id): - - P = net.res_line.p_from_mw.at[line_idx] - Q = net.res_line.q_from_mvar.at[line_idx] + if "powerflow_results" in kwargs: + logger.warning( + "The powerflow_results argument is deprecated and will be removed in the future." + ) - vm = net.bus.vn_kv.at[bus_idx] * net.res_line.vm_from_pu.at[line_idx] - else: - P = net.res_line.p_to_mw.at[line_idx] - Q = net.res_line.q_to_mvar.at[line_idx] + pp.runpp(net) + line_idx = net.switch.element.at[switch_id] - vm = net.bus.vn_kv.at[bus_idx] * net.res_line.vm_to_pu.at[line_idx] + if get_from_bus_info_switch(net, switch_id): + p = net.res_line_sc.p_from_mw.at[line_idx] + q = net.res_line_sc.q_from_mvar.at[line_idx] else: - - if get_from_bus_info_switch(net, switch_id): - - P = net.res_line_sc.p_from_mw.at[line_idx] - Q = net.res_line_sc.q_from_mvar.at[line_idx] - - vm = net.bus.vn_kv.at[bus_idx] * net.res_line_sc.vm_from_pu.at[line_idx] - - else: - P = net.res_line_sc.p_to_mw.at[line_idx] - Q = net.res_line_sc.q_to_mvar.at[line_idx] - vm = net.bus.vn_kv.at[bus_idx] * net.res_line_sc.vm_to_pu.at[line_idx] - - if P > 0 and Q > 0: - vi_angle = math.degrees(math.atan(Q / P)) - elif P < 0 and Q >= 0: - vi_angle = math.degrees(math.atan(Q / P)) + 180 - elif P < 0 and Q < 0: - vi_angle = math.degrees(math.atan(Q / P)) - 180 - elif P == 0 and Q > 0: + p = net.res_line_sc.p_to_mw.at[line_idx] + q = net.res_line_sc.q_to_mvar.at[line_idx] + + if p > 0 and q > 0: + vi_angle = math.degrees(math.atan(q / p)) + elif p < 0 <= q: + vi_angle = math.degrees(math.atan(q / p)) + 180 + elif p < 0 and q < 0: + vi_angle = math.degrees(math.atan(q / p)) - 180 + elif p == 0 < q: vi_angle = 90 - elif P == 0 and Q < 0: + elif p == 0 > q: vi_angle = -90 else: vi_angle = math.inf @@ -1007,8 +919,8 @@ def bus_path_multiple_ext_bus(net): elif len(from_bus_path) != len(to_bus_path): if len(from_bus_path) > 1 and len(to_bus_path) > 1: - minlen = min(len(from_bus_path), len(to_bus_path)) - if from_bus_path[minlen - 1] != to_bus_path[minlen - 1]: + min_len = min(len(from_bus_path), len(to_bus_path)) + if from_bus_path[min_len - 1] != to_bus_path[min_len - 1]: if len(from_bus_path) < len(to_bus_path): from_bus_path.append(to_bus_path[-1]) max_bus_path.append(from_bus_path) @@ -1025,19 +937,19 @@ def bus_path_multiple_ext_bus(net): return bus_path - # get the line path from the given bus path +# get the line path from the given bus path def get_line_path(net, bus_path): """ Function return the list of line path from the given bus path""" - line_path=[] - for i in range(len(bus_path)-1): - bus1=bus_path[i] - bus2=bus_path[i+1] - line1=net.line[(net.line.from_bus==bus1) & (net.line.to_bus==bus2)].index.to_list() - line2=net.line[(net.line.from_bus==bus2) & (net.line.to_bus==bus1)].index.to_list() - if len(line2)==0: + line_path = [] + for i in range(len(bus_path) - 1): + bus1 = bus_path[i] + bus2 = bus_path[i + 1] + line1 = net.line[(net.line.from_bus == bus1) & (net.line.to_bus == bus2)].index.to_list() + line2 = net.line[(net.line.from_bus == bus2) & (net.line.to_bus == bus1)].index.to_list() + if len(line2) == 0: line_path.append(line1[0]) - if len(line1)==0: + if len(line1) == 0: line_path.append(line2[0]) return line_path