ME 405 Romi
Loading...
Searching...
No Matches
cotask.py
Go to the documentation of this file.
29
30import gc # Memory allocation garbage collector
31import utime # Micropython version of time library
32import micropython # This shuts up incorrect warnings
33
34
35
66class Task:
67
68
88 def __init__(self, run_fun, name="NoName", priority=0, period=None,
89 profile=False, trace=False, shares=()):
90 # The function which is run to implement this task's code. Since it
91 # is a generator, we "run" it here, which doesn't actually run it but
92 # gets it going as a generator which is ready to yield values
93 if shares:
94 self._run_gen = run_fun(shares)
95 else:
96 self._run_gen = run_fun()
97
98
99 self.name = name
100
101
103 self.priority = int(priority)
104
105
110 if period != None:
111 self.period = int(period * 1000)
112 self._next_run = utime.ticks_us() + self.period
113 else:
114 self.period = period
115 self._next_run = None
116
117 # Flag which causes the task to be profiled, in which the execution
118 # time of the @c run() method is measured and basic statistics kept.
119 self._prof = profile
120 self.reset_profile()
121
122 # The previous state in which the task last ran. It is used to watch
123 # for and track state transitions.
124 self._prev_state = 0
125
126 # If transition tracing has been enabled, create an empty list in
127 # which to store transition (time, to-state) stamps
128 self._trace = trace
129 self._tr_data = []
130 self._prev_time = utime.ticks_us()
131
132
134 self.go_flag = False
135
136
137
143 def schedule(self) -> bool:
144 if self.ready():
145
146 # Reset the go flag for the next run
147 self.go_flag = False
148
149 # If profiling, save the start time
150 if self._prof:
151 stime = utime.ticks_us()
152
153 # Run the method belonging to the state which should be run next
154 curr_state = next(self._run_gen)
155
156 # If profiling or tracing, save timing data
157 if self._prof or self._trace:
158 etime = utime.ticks_us()
159
160 # If profiling, save timing data
161 if self._prof:
162 self._runs += 1
163 runt = utime.ticks_diff(etime, stime)
164 if self._runs > 2:
165 self._run_sum += runt
166 if runt > self._slowest:
167 self._slowest = runt
168
169 # If transition logic tracing is on, record a transition; if not,
170 # ignore the state. If out of memory, switch tracing off and
171 # run the memory allocation garbage collector
172 if self._trace:
173 try:
174 if curr_state != self._prev_state:
175 self._tr_data.append(
176 (utime.ticks_diff(etime, self._prev_time),
177 curr_state))
178 except MemoryError:
179 self._trace = False
180 gc.collect()
181
182 self._prev_state = curr_state
183 self._prev_time = etime
184
185 return True
186
187 else:
188 return False
189
190
191
196 @micropython.native
197 def ready(self) -> bool:
198 # If this task uses a timer, check if it's time to run run() again. If
199 # so, set go flag and set the timer to go off at the next run time
200 if self.period != None:
201 late = utime.ticks_diff(utime.ticks_us(), self._next_run)
202 if late > 0:
203 self.go_flag = True
204 self._next_run = utime.ticks_diff(self.period,
205 -self._next_run)
206
207 # If keeping a latency profile, record the data
208 if self._prof:
209 self._late_sum += late
210 if late > self._latest:
211 self._latest = late
212
213 # If the task doesn't use a timer, we rely on go_flag to signal ready
214 return self.go_flag
215
216
217
221 def set_period(self, new_period):
222 if new_period is None:
223 self.period = None
224 else:
225 self.period = int(new_period) * 1000
226
227
228
230 def reset_profile(self):
231 self._runs = 0
232 self._run_sum = 0
233 self._slowest = 0
234 self._late_sum = 0
235 self._latest = 0
236
237
238
242 def get_trace(self):
243 tr_str = 'Task ' + self.name + ':'
244 if self._trace:
245 tr_str += '\n'
246 last_state = 0
247 total_time = 0.0
248 for item in self._tr_data:
249 total_time += item[0] / 1000000.0
250 tr_str += '{: 12.6f}: {: 2d} -> {:d}\n'.format (total_time,
251 last_state, item[1])
252 last_state = item[1]
253 else:
254 tr_str += ' not traced'
255 return tr_str
256
257
258
261 def go(self):
262 self.go_flag = True
263
264
265
269 def __repr__(self):
270 rst = f"{self.name:<16s}{self.priority: 4d}"
271 try:
272 rst += f"{(self.period / 1000.0): 10.1f}"
273 except TypeError:
274 rst += ' -'
275 rst += f"{self._runs: 8d}"
276
277 if self._prof and self._runs > 0:
278 avg_dur = (self._run_sum / self._runs) / 1000.0
279 avg_late = (self._late_sum / self._runs) / 1000.0
280 rst += f"{avg_dur: 10.3f}{(self._slowest / 1000.0): 10.3f}"
281 if self.period != None:
282 rst += f"{avg_late: 10.3f}{(self._latest / 1000.0): 10.3f}"
283 return rst
284
285
286# =============================================================================
287
288
300
301
303 def __init__(self):
304
305
309 self.pri_list = []
310
311
312
316 def append(self, task):
317 # See if there's a tasklist with the given priority in the main list
318 new_pri = task.priority
319 for pri in self.pri_list:
320 # If a tasklist with this priority exists, add this task to it.
321 if pri[0] == new_pri:
322 pri.append(task)
323 break
324
325 # If the priority isn't in the list, this else clause starts a new
326 # priority list with this task as first one. A priority list has the
327 # priority as element 0, an index into the list of tasks (used for
328 # round-robin scheduling those tasks) as the second item, and tasks
329 # after those
330 else:
331 self.pri_list.append([new_pri, 2, task])
332
333 # Make sure the main list (of lists at each priority) is sorted
334 self.pri_list.sort(key=lambda pri: pri[0], reverse=True)
335
336
337
346 @micropython.native
347 def rr_sched(self):
348 # For each priority level, run all tasks at that level
349 for pri in self.pri_list:
350 for task in pri[2:]:
351 task.schedule()
352
353
354
359 @micropython.native
360 def pri_sched(self):
361 # Go down the list of priorities, beginning with the highest
362 for pri in self.pri_list:
363 # Within each priority list, run tasks in round-robin order
364 # Each priority list is [priority, index, task, task, ...] where
365 # index is the index of the next task in the list to be run
366 tries = 2
367 length = len(pri)
368 while tries < length:
369 ran = pri[pri[1]].schedule()
370 tries += 1
371 pri[1] += 1
372 if pri[1] >= length:
373 pri[1] = 2
374 if ran:
375 return
376
377
378
379 def __repr__(self):
380 ret_str = 'TASK PRI PERIOD RUNS AVG DUR MAX ' \
381 'DUR AVG LATE MAX LATE\n'
382 for pri in self.pri_list:
383 for task in pri[2:]:
384 ret_str += str(task) + '\n'
385
386 return ret_str
387
388
389
391task_list = TaskList()
392
393
394
395
Implements multitasking with scheduling and some performance logging.
Definition cotask.py:66
int _prev_state
Definition cotask.py:124
set_period(self, new_period)
This method sets the period between runs of the task to the given number of milliseconds,...
Definition cotask.py:221
reset_profile(self)
This method resets the variables used for execution time profiling.
Definition cotask.py:230
int _late_sum
Definition cotask.py:234
bool _trace
Definition cotask.py:128
__repr__(self)
This method converts the task to a string for diagnostic use.
Definition cotask.py:269
__init__(self, run_fun, name="NoName", priority=0, period=None, profile=False, trace=False, shares=())
Initialize a task object so it may be run by the scheduler.
Definition cotask.py:89
get_trace(self)
This method returns a string containing the task's transition trace.
Definition cotask.py:242
int _slowest
Definition cotask.py:166
bool ready(self)
This method checks if the task is ready to run.
Definition cotask.py:197
go(self)
Method to set a flag so that this task indicates that it's ready to run.
Definition cotask.py:261
int _latest
Definition cotask.py:210
name
The name of the task, hopefully a short and descriptive string.
Definition cotask.py:99
int _run_sum
Definition cotask.py:232
list _tr_data
Definition cotask.py:129
bool go_flag
Flag which is set true when the task is ready to be run by the scheduler.
Definition cotask.py:134
int _runs
Definition cotask.py:231
priority
The task's priority, an integer with higher numbers meaning higher priority.
Definition cotask.py:103
bool schedule(self)
This method is called by the scheduler; it attempts to run this task.
Definition cotask.py:143
int period
Definition cotask.py:111
A list of tasks used internally by the task scheduler.
Definition cotask.py:299
__repr__(self)
Create some diagnostic text showing the tasks in the task list.
Definition cotask.py:379
pri_sched(self)
Run tasks according to their priorities.
Definition cotask.py:360
list pri_list
The list of priority lists.
Definition cotask.py:309
rr_sched(self)
Run tasks in order, ignoring the tasks' priorities.
Definition cotask.py:347
__init__(self)
Initialize the task list.
Definition cotask.py:303
append(self, task)
Append a task to the task list.
Definition cotask.py:316