Queue и Threading
Извиняюсь за скорее всего глупую просьбу, ну не могли бы вы мне ткнуть на мои ошибки. Python изучаю недавно, поэтому немного затрудняюсь пока что с поиском правильных решений. Имеется вот такой код:
from queue import Queue
from threading import Thread, enumerate
import threading
from tradingview_ta import *
import xlsxwriter
workbook = xlsxwriter.Workbook('test.xlsx')
worksheet = workbook.add_worksheet()
txt = open(r"C:\Users\Users\Desktop\python\text.txt", "r", encoding='utf-8')
tickers = []
def worker():
count = 1
for s in txt:
tickers = s.rstrip()
data = TA_Handler(symbol=tickers, exchange="ASCENDEX",screener="crypto",\
interval="1d")
interval = data.get_analysis().interval
ticker = data.get_analysis().symbol
exhange = data.get_analysis().exchange
if s:
count += 1
qput = q.put(interval)
q.get(qput)
worksheet.write(f'A{count}', interval)
worksheet.write(f'B{count}', ticker)
worksheet.write(f'C{count}', exchange)
print(interval)
q.task_done()
q = Queue()
for i in range(1, 30):
threading.Thread(target=worker, daemon=True).start()
q.join()
workbook.close()
Пытаюсь сделать так, чтобы из файла txt брались тикеры, далее подставлялись в tickers:
data = TA_Handler(symbol=tickers, exchange="ASCENDEX",screener="crypto",
interval="1d")
после в очередь даю задания и получаю их. Далее в xlsx файл записываются значения в A1..... intervals в B1........ tickers В С1......exchange
Много статей прочитал уже о многопоточности, видимо на начальных этапах сложно усвоить все. Подскажите пожалуйста на примере моего кода или вашего, как правильно нужно запускать многопоток, чтобы потоки между собой не перебивались и не хватались за одной и тоже одновременно.
Ответы (2 шт):
Как пишут, не бывает глупых вопросов. Но ваш немного не закончен, так как не понятно, что сейчас на выходе у вашего кода относительно ваших желаний, и в чём проблема. Вот вам два моих учебных кода - добавил в них небольшие комментарии. Один код - на простую запись и передачу информации, другой - на работу через Queue.
И советую писать по PEP-8, включая длины строк и пропуски строк, - смотреть неудобно. Внес правки в ваш код в этом отношении. Да и минимальные комментарии желательно иметь в коде, а не вне его:-)
'''
Проводим запись в файл одним потоком, чтение - другим, с простейшей
передачей информации в поток списком.
'''
from threading import Thread
def thread_read (file_name, value_holder):
print('1')
with open(file_name, 'r',encoding="utf-8") as f:
value_holder[0] = f.read()
def thread_write (file_name, data):
f = open(file_name, 'w')
f.write(data)
f.close
if __name__ == "__main__":
file_r="read_thread.txt"
# Способ передачи результата между потоками - списком
result_holder = [None]
file_w="write_thread.txt"
new_th1=Thread (target=thread_read,args=(file_r,result_holder))
new_th1.start()
# Ждем, когда new_th1 закончит выполнение
# Или можно было import time и time.sleep (0.1), например
new_th1.join()
new_th2=Thread (target=thread_write,args=(file_w,result_holder[0]))
new_th2.start()
Вот другой код:
'''
20 случайными значениями заполняем Queve, передаём в 4 запускаемых потока.
После этого делаем вывод информации в консоль о том, в каком потоке
произошло извлечение данных
'''
from threading import Thread, current_thread
import queue
import random
def worker(holder):
for y in range(5):
name=current_thread().name
print (f'Process:{name} Value:{holder.get()}')
if __name__ == "__main__":
result_holder = queue.Queue()
list1=[random.randint(0,100) for i in range (20)]
print(list1)
for i in list1:
result_holder.put(i)
for z in range(4):
new_proc1=Thread (target=worker,name=str(z), args=(result_holder,))
new_proc1.start()
new_proc1.join()
Вопрос в том, что именно вы пытаетесь ускорить. Насколько я понимаю, основное, что тут нужно ускорять, это этот кусок, где вызываются методы TA_Handler. Чтение из файла и запись не настолько уж тормозят, плюс к тому их довольно сложно нормально превратить в многопоточный вариант. Поэтому лучше всего вам сделать по такой схеме:
- написать функцию, которая получает на вход тикеры и возвращает по ним аналитику
- прочитать файл с тикерами
- вызвать
multiprocessing.Pool, передав ему на вход вашу функцию и список прочтённых тикеров - проитерироваться по полученному результату, записав его в выходной файл
- да, и ещё нужно не забыть обернуть основной код в отдельную функцию и сделать специальный
ifс проверкой, иначе будут проблемы с многопроцессностью
Таким образом, чтение файла и запись в эксель останутся однопоточными (какими они и должны быть), а теханализ будет произведён многопроцессно, причём практически без усилий с вашей стороны.
Основной кусок кода для понимания:
from multiprocessing import Pool
# ... здесь остальные импорты
def worker(tickers):
tickers = tickers.rstrip()
data = TA_Handler(symbol=tickers, exchange="ASCENDEX",screener="crypto", interval="1d")
interval = data.get_analysis().interval
ticker = data.get_analysis().symbol
exhange = data.get_analysis().exchange
return interval, ticker, exchange
def main():
# ... здесь открытие файла и экселя
with Pool() as pool:
for interval, ticker, exchange in pool.map(worker, txt.readlines())
worksheet.write(f'A{count}', interval)
worksheet.write(f'B{count}', ticker)
worksheet.write(f'C{count}', exchange)
# ... здесь закрытие экселя
if __name__ == "__main__":
main()
P.S. Ну а очередь queue вам тут вообще не нужна.