Queue и Threading

Извиняюсь за скорее всего глупую просьбу, ну не могли бы вы мне ткнуть на мои ошибки. Python изучаю недавно, поэтому немного затрудняюсь пока что с поиском правильных решений. Имеется вот такой код:

from queue import Queue
from threading import Thread, enumerate
import threading
from tradingview_ta import *
import xlsxwriter


workbook = xlsxwriter.Workbook('test.xlsx')
worksheet = workbook.add_worksheet()

txt = open(r"C:\Users\Users\Desktop\python\text.txt", "r", encoding='utf-8')
tickers = []

def worker():
    count = 1
    for s in txt:
        tickers = s.rstrip()
        data = TA_Handler(symbol=tickers, exchange="ASCENDEX",screener="crypto",\
                          interval="1d")
        interval = data.get_analysis().interval
        ticker = data.get_analysis().symbol
        exhange = data.get_analysis().exchange

        if s:
            count += 1
            qput = q.put(interval)
            q.get(qput)
            worksheet.write(f'A{count}', interval)
            worksheet.write(f'B{count}', ticker)
            worksheet.write(f'C{count}', exchange)
            print(interval)
            q.task_done()


q = Queue()

for i in range(1, 30):
    threading.Thread(target=worker, daemon=True).start()

q.join()
workbook.close()

Пытаюсь сделать так, чтобы из файла txt брались тикеры, далее подставлялись в tickers:

data = TA_Handler(symbol=tickers, exchange="ASCENDEX",screener="crypto",   
 interval="1d")

после в очередь даю задания и получаю их. Далее в xlsx файл записываются значения в A1..... intervals в B1........ tickers В С1......exchange

Много статей прочитал уже о многопоточности, видимо на начальных этапах сложно усвоить все. Подскажите пожалуйста на примере моего кода или вашего, как правильно нужно запускать многопоток, чтобы потоки между собой не перебивались и не хватались за одной и тоже одновременно.


Ответы (2 шт):

Автор решения: Сергей

Как пишут, не бывает глупых вопросов. Но ваш немного не закончен, так как не понятно, что сейчас на выходе у вашего кода относительно ваших желаний, и в чём проблема. Вот вам два моих учебных кода - добавил в них небольшие комментарии. Один код - на простую запись и передачу информации, другой - на работу через Queue.

И советую писать по PEP-8, включая длины строк и пропуски строк, - смотреть неудобно. Внес правки в ваш код в этом отношении. Да и минимальные комментарии желательно иметь в коде, а не вне его:-)

'''
Проводим запись в файл одним потоком, чтение - другим, с простейшей 
передачей информации в поток списком. 
'''
from threading import Thread

def thread_read (file_name, value_holder):
    print('1')
    with open(file_name, 'r',encoding="utf-8") as f:
        value_holder[0] = f.read() 

def thread_write (file_name, data):
    f = open(file_name, 'w')
    f.write(data)
    f.close 

if __name__ == "__main__":
    file_r="read_thread.txt"
    # Способ передачи результата между потоками - списком
    result_holder = [None]
    file_w="write_thread.txt"
    new_th1=Thread (target=thread_read,args=(file_r,result_holder))
    new_th1.start()
    # Ждем, когда new_th1 закончит выполнение
    # Или можно было import time и time.sleep (0.1), например
    new_th1.join() 
    new_th2=Thread (target=thread_write,args=(file_w,result_holder[0]))
    new_th2.start()

Вот другой код:

'''
20 случайными значениями заполняем Queve, передаём в 4 запускаемых потока.
После этого делаем вывод информации в консоль о том, в каком потоке
произошло извлечение данных
'''
from threading import Thread, current_thread
import queue
import random

def worker(holder):
    for y in range(5):
        name=current_thread().name
        print (f'Process:{name} Value:{holder.get()}')

if __name__ == "__main__":
    result_holder = queue.Queue()
    list1=[random.randint(0,100) for i in range (20)]
    print(list1)
    for i in list1:
        result_holder.put(i)
    for z in range(4):
        new_proc1=Thread (target=worker,name=str(z), args=(result_holder,))
        new_proc1.start()
        new_proc1.join()
→ Ссылка
Автор решения: CrazyElf

Вопрос в том, что именно вы пытаетесь ускорить. Насколько я понимаю, основное, что тут нужно ускорять, это этот кусок, где вызываются методы TA_Handler. Чтение из файла и запись не настолько уж тормозят, плюс к тому их довольно сложно нормально превратить в многопоточный вариант. Поэтому лучше всего вам сделать по такой схеме:

  • написать функцию, которая получает на вход тикеры и возвращает по ним аналитику
  • прочитать файл с тикерами
  • вызвать multiprocessing.Pool, передав ему на вход вашу функцию и список прочтённых тикеров
  • проитерироваться по полученному результату, записав его в выходной файл
  • да, и ещё нужно не забыть обернуть основной код в отдельную функцию и сделать специальный if с проверкой, иначе будут проблемы с многопроцессностью

Таким образом, чтение файла и запись в эксель останутся однопоточными (какими они и должны быть), а теханализ будет произведён многопроцессно, причём практически без усилий с вашей стороны.

Основной кусок кода для понимания:

from multiprocessing import Pool

# ... здесь остальные импорты

def worker(tickers):
    tickers = tickers.rstrip()
    data = TA_Handler(symbol=tickers, exchange="ASCENDEX",screener="crypto",     interval="1d")
    interval = data.get_analysis().interval
    ticker = data.get_analysis().symbol
    exhange = data.get_analysis().exchange
    return interval, ticker, exchange

def main():

    # ... здесь открытие файла и экселя

    with Pool() as pool:
       for interval, ticker, exchange in pool.map(worker, txt.readlines())
            worksheet.write(f'A{count}', interval)
            worksheet.write(f'B{count}', ticker)
            worksheet.write(f'C{count}', exchange)

   # ... здесь закрытие экселя

if __name__ == "__main__":
    main()

P.S. Ну а очередь queue вам тут вообще не нужна.

→ Ссылка