-
Notifications
You must be signed in to change notification settings - Fork 0
/
nanovna_mag.py
114 lines (91 loc) · 3.05 KB
/
nanovna_mag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from rotor import *
import time
import os
import shutil
from os import listdir
from os.path import isfile, join
import csv
import math
import subprocess
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import threading
class nanovna_mag(FileSystemEventHandler):
basepath = "/tmp"
temp_folder = basepath+"/antenna_diagram"
ts_folder = temp_folder+"/ts_temp"
ts_file = ""
_proc = 0
_rotor = None
_rotor_step = 1
_continue_event = None
antenna_diagram = []
def __init__(self, rotor, rotor_step):
# get rotor from main
self._rotor = rotor
self._rotor_step = rotor_step
#init fs
try:
shutil.rmtree(self.temp_folder)
except FileNotFoundError:
print("removing folder failed, does not exist")
#endtry
os.mkdir(self.temp_folder)
os.mkdir(self.ts_folder)
# init event handler (watchdog)
self._event_handler = self
self._observer = Observer()
self._observer.schedule(self._event_handler, path=self.ts_folder, recursive=False)
self._observer.start()
#enddef
def __del__(self):
self._observer.stop()
def start(self, freq):
self._proc = subprocess.Popen(["python3", "./nanovna-saver/nanovna-saver.py", "-o", self.ts_folder, "-f", str(freq * 1000000), "-t", str((freq * 1000000) + 1000), "-i"],
stdout=subprocess.DEVNULL)
self._continue_event = threading.Event()
#enddef
def stop(self):
self._proc.terminate()
#enddef
def on_created(self, event):
#print("on_created", event.src_path)
mag_array = []
file_ok = False
while not file_ok:
# read touchstone file and calculate magnitude of S21 for all sweep points
with open(event.src_path, newline='') as csvfile:
reader = csv.reader(csvfile, delimiter=' ')
for row in reader:
#print(f"row = '{row}'")
if row[0] != '#':
mag = math.sqrt(math.pow(float(row[4]), 2) + math.pow(float(row[5]), 2))
mag_array.append(mag)
#endfor
#endwith
# retry until file has correct length (101 lines)
if len(mag_array) == 101:
file_ok=True
else:
mag_array=[]
#endif
#endfor
# calculate average magnitude -> we want one value for one step
avg = 0
for i in mag_array:
avg = avg + i
avg = avg / len(mag_array)
az = self._rotor.get_azimuth()
print(f"{az} {avg}")
self.antenna_diagram.append([az, avg])
try:
self._rotor.step_azimuth(self._rotor_step)
except RotorDegreeOutOfRange:
print("Rotor at end, stopping")
self._continue_event.set()
#endtry
#enddef
def wait_for_continue(self):
self._continue_event.wait()
#enddef
#endclass