Выполнение функций по расписанию
Всем привет! Вопрос в следующем. Есть три функции:
def func1():
#do something
return value1
def func2():
#do something
return value2
def func3():
#do something
return value3
далее есть функция main, в которой результат этих трех функций записывается в словарь. примерно вот так:
def main():
#do something
my_dict = {
'key1': func1(),
'key2': func2(),
'key3': func3()
}
#do something with my_dict
Как можно настроить выполнение функции main так, чтобы функция func1() и func2() запускались каждые 10 секунд, а функция func3() - каждые 20 секунд? Выполнение функции main должно быть по сути бесконечным, пока приложение активно. Функции 1, 2 и 3 при каждой итерации возвращают разные значения. Из функции main, соответственно, после каждой итерации должно возвращаться соответствующее Мне кажется, что можно что-то через модуль threading придумать, но что-то никак в голову не приходит решение =(
Ответы (2 шт):
Если верно уловил вашу мысль, то действительно можем попробовать решить вашу задачу через модуль threading.
Пример кода:
import threading
import time
import random
from collections import namedtuple, defaultdict
def func1():
return random.randint(1, 10)
def func2():
return random.randint(20, 30)
def func3():
return random.randint(40, 50)
def run_func(thread_data, func, interval = 10):
stop_event, lock, my_dict = unpack_thread_data(thread_data)
name = func.__name__
while not stop_event.is_set():
result = func()
with lock:
my_dict[name].append(result)
stop_event.wait(interval)
print(f"Поток {name} остановился!")
def print_dict(thread_data):
stop_event, lock, my_dict = unpack_thread_data(thread_data)
while not stop_event.is_set():
with lock:
print(my_dict)
stop_event.wait(2)
print("Поток print_dict остановился!")
def unpack_thread_data(thread_data):
return thread_data.stop_event, thread_data.lock, thread_data.my_dict
def create_thread_data():
ThreadData = namedtuple('ThreadData', ['my_dict', 'lock', 'stop_event'])
return ThreadData(
defaultdict(list),
threading.Lock(),
threading.Event()
)
def start_threads(thread_data):
threads = [
threading.Thread(target=run_func, args=(thread_data, func1)),
threading.Thread(target=run_func, args=(thread_data, func2)),
threading.Thread(target=run_func, args=(thread_data, func3, 20)),
threading.Thread(target=print_dict, args=(thread_data,))
]
for thread in threads:
thread.start()
return threads
def stop_threads(threads, stop_event):
stop_event.set()
for thread in threads:
thread.join()
print("Все потоки остановились.")
def main():
thread_data = create_thread_data()
threads = start_threads(thread_data)
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
print("Процесс прерван. Ожидаем завершение потоков...")
finally:
stop_threads(threads, thread_data.stop_event)
input("Enter для выхода из программы!")
if __name__ == "__main__":
main()
Вывод:
defaultdict(<class 'list'>, {'func1': [6], 'func2': [27], 'func3': [42]})
defaultdict(<class 'list'>, {'func1': [6], 'func2': [27], 'func3': [42]})
defaultdict(<class 'list'>, {'func1': [6], 'func2': [27], 'func3': [42]})
defaultdict(<class 'list'>, {'func1': [6], 'func2': [27], 'func3': [42]})
defaultdict(<class 'list'>, {'func1': [6], 'func2': [27], 'func3': [42]})
defaultdict(<class 'list'>, {'func1': [6, 4], 'func2': [27, 21], 'func3': [42]})
defaultdict(<class 'list'>, {'func1': [6, 4], 'func2': [27, 21], 'func3': [42]})
defaultdict(<class 'list'>, {'func1': [6, 4], 'func2': [27, 21], 'func3': [42]})
defaultdict(<class 'list'>, {'func1': [6, 4], 'func2': [27, 21], 'func3': [42]})
defaultdict(<class 'list'>, {'func1': [6, 4], 'func2': [27, 21], 'func3': [42]})
defaultdict(<class 'list'>, {'func1': [6, 4, 8], 'func2': [27, 21, 21], 'func3': [42, 48]})
defaultdict(<class 'list'>, {'func1': [6, 4, 8], 'func2': [27, 21, 21], 'func3': [42, 48]})
defaultdict(<class 'list'>, {'func1': [6, 4, 8], 'func2': [27, 21, 21], 'func3': [42, 48]})
defaultdict(<class 'list'>, {'func1': [6, 4, 8], 'func2': [27, 21, 21], 'func3': [42, 48]})
defaultdict(<class 'list'>, {'func1': [6, 4, 8], 'func2': [27, 21, 21], 'func3': [42, 48]})
defaultdict(<class 'list'>, {'func1': [6, 4, 8, 7], 'func2': [27, 21, 21, 23], 'func3': [42, 48]})
defaultdict(<class 'list'>, {'func1': [6, 4, 8, 7], 'func2': [27, 21, 21, 23], 'func3': [42, 48]})
Процесс прерван. Ожидаем завершение потоков...
Поток print_dict остановился!
Поток func1 остановился!
Поток func3 остановился!
Поток func2 остановился!
Все потоки остановились.
Enter для выхода из программы!
P.S. Ctrl+C
- для принудительной остановки программы.
Можно сделать с помощью генераторов.
import time
def func1():
#do something
return value1
def func2():
#do something
return value2
def func3():
#do something
return value3
def main():
count = 2
while True:
time.sleep(10)
my_dict = {
'key1': func1(),
'key2': func2(),
}
if count % 2 == 0:
my_dict['key3'] = func3()
count += 1
yield my_dict # (1)
n = main() # (2)
print(next(n)) # (3)
print(next(n))
yield
- ключевое слово по работе напоминающееreturn
но не прерывающее функции. Оно делает функцию генератором.Записываем объект генератора в
n
Далее, сколько нужно получаем след. значение с помощью
next
.