ME 405 Romi
Loading...
Searching...
No Matches
basic_tasks.py
Go to the documentation of this file.
1"""!
2@file basic_tasks.py
3 This file contains a demonstration program that runs some tasks, an
4 inter-task shared variable, and a queue. The tasks don't really @b do
5 anything; the example just shows how these elements are created and run.
6
7@author JR Ridgely
8@date 2021-Dec-15 JRR Created from the remains of previous example
9@copyright (c) 2015-2021 by JR Ridgely and released under the GNU
10 Public License, Version 2.
11"""
12
13import gc
14import pyb
15import cotask
16import task_share
17
18
19def task1_fun(shares):
20 """!
21 Task which puts things into a share and a queue.
22 @param shares A list holding the share and queue used by this task
23 """
24 # Get references to the share and queue which have been passed to this task
25 my_share, my_queue = shares
26
27 counter = 0
28 while True:
29 my_share.put(counter)
30 my_queue.put(counter)
31 counter += 1
32
33 yield 0
34
35
36def task2_fun(shares):
37 """!
38 Task which takes things out of a queue and share and displays them.
39 @param shares A tuple of a share and queue from which this task gets data
40 """
41 # Get references to the share and queue which have been passed to this task
42 the_share, the_queue = shares
43
44 while True:
45 # Show everything currently in the queue and the value in the share
46 print(f"Share: {the_share.get ()}, Queue: ", end='')
47 while q0.any():
48 print(f"{the_queue.get ()} ", end='')
49 print('')
50
51 yield 0
52
53
54# This code creates a share, a queue, and two tasks, then starts the tasks. The
55# tasks run until somebody presses ENTER, at which time the scheduler stops and
56# printouts show diagnostic information about the tasks, share, and queue.
57if __name__ == "__main__":
58 print("Testing ME405 stuff in cotask.py and task_share.py\r\n"
59 "Press Ctrl-C to stop and show diagnostics.")
60
61 # Create a share and a queue to test function and diagnostic printouts
62 share0 = task_share.Share('h', thread_protect=False, name="Share 0")
63 q0 = task_share.Queue('L', 16, thread_protect=False, overwrite=False,
64 name="Queue 0")
65
66 # Create the tasks. If trace is enabled for any task, memory will be
67 # allocated for state transition tracing, and the application will run out
68 # of memory after a while and quit. Therefore, use tracing only for
69 # debugging and set trace to False when it's not needed
70 task1 = cotask.Task(task1_fun, name="Task_1", priority=1, period=400,
71 profile=True, trace=False, shares=(share0, q0))
72 task2 = cotask.Task(task2_fun, name="Task_2", priority=2, period=1500,
73 profile=True, trace=False, shares=(share0, q0))
74 cotask.task_list.append(task1)
75 cotask.task_list.append(task2)
76
77 # Run the memory garbage collector to ensure memory is as defragmented as
78 # possible before the real-time scheduler is started
79 gc.collect()
80
81 # Run the scheduler with the chosen scheduling algorithm. Quit if ^C pressed
82 while True:
83 try:
84 cotask.task_list.pri_sched()
85 except KeyboardInterrupt:
86 break
87
88 # Print a table of task data and a table of shared information data
89 print('\n' + str (cotask.task_list))
90 print(task_share.show_all())
91 print(task1.get_trace())
92 print('')
Implements multitasking with scheduling and some performance logging.
Definition cotask.py:66
A queue which is used to transfer data from one task to another.
Definition task_share.py:90
An item which holds data to be shared between tasks.
task1_fun(shares)
Task which puts things into a share and a queue.
task2_fun(shares)
Task which takes things out of a queue and share and displays them.
show_all()
Create a string holding a diagnostic printout showing the status of each queue and share in the syste...
Definition task_share.py:46