[Python] Многопоточное скачивание файлов с ftp python-скриптом
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Зачем это нужно?Однажды передо мной встала задача копирования большого количества файлов с ftp-сервера. Нужно было делать бэкап. Казалось бы, что может быть проще! Но увы, ничего готового работающего так же быстро для моих условий найти не удалось.СитуацияНужно было забирать периодически пару сотен файлов с ftp-сервера под Windows. Много мелочи и несколько очень крупных по размеру файлов. Суммарно примерно на 500 Гб. Сервер представляет собой vps, расположенный довольно далеко за рубежом. Днем машина высоко нагружена, рано ночью выполняются регламентные работы, итого на скачивание часов 5 максимум. Ни одна из рассмотренных мной утилит не смогла справиться качественно и за отведенное время. Ну что ж, деваться некуда, нормальную систему резервного копирования ещё не купили, а значит ноги в руки вооружаемся редактором или IDE Python и вперёд! За приключениями!КонфигВсе параметры для скрипта вынесем в отдельный файл для удобства.Шаблон конфига:
host = 'ip.ip.ip.ip'
user = 'ftpusername'
passwd = 'ftppassword'
basepath = '/path/to/backup/folder' # Папка, в которой будут созданы подпапки со скачанными файлами
max_threads = 20 # максимальное количество одновременных процессов загрузки
log_path = '\path\to\logfile'
statusfilepath = '\path\to\statusfile'
Конфиг сохраняем с расширением .py и импортируем в начале нашего скрипта. Импортировать можно непосредственно в пространство имён скрипта, но я сделал конструкцию слегка напоминающую костыль в основной части моего скрипта:
if __name__ == "__main__":
host = config.host
user = config.user
passwd = config.passwd
basepath = config.basepath # Папка, в которой будут созданы подпапки со скачанными файлами
max_threads = config.max_threads
log_path = config.log_path
statusfilepath = config.statusfilepath
main()
В начале был списокСкачать файлы с ftp сама по себе задача не сложная, но путём недолгих экспериментов было выяснено, что скачивание файлов занимает время, а таймаут ftp-соединения приходит к нам гораздо быстрее. Следовательно, качать нужно каждый файл в новом соединении, иначе велик риск чего-то потом не досчитаться в скачанных файлах.Для этого нам нужен список этих самых файлов. Ни о каком статичном списке файлов, конечно, речи не идет, значит нам его при каждом выполнении скрипта получать с сервера по-новой.Удобства ради и чтобы не таскать параметры по всему коду - переопределим параметры стандартного класса ftp:
class MyFtp (ftplib.FTP):
"""Класс переопределяет стандартный, чтобы задать все параметры соединение в одном месте"""
def __init__(self):
self.host = host
self.user = user
self.passwd = passwd
self.timeout = 1800
super(MyFtp, self).__init__()
def connect(self):
super(MyFtp, self).connect(self.host, timeout=self.timeout)
def login(self):
super(MyFtp, self).login(user=self.user, passwd=self.passwd)
def quit(self):
super(MyFtp,self).quit()
Параметры берутся из конфига. Конечно же в нужно не забыть импортировать библиотеку ftplib, чтобы этот кусок заработал.Список файлов с сервера мы получим с помощью следующего класса:
class FileList:
"""Класс для работы со списком загружаемых файлов"""
def __init__(self):
self.ftp = None
self.file_list = []
def connect_ftp(self):
import sys
self.ftp = MyFtp()
self.ftp.connect()
self.ftp.login()
self.ftp.__class__.encoding = sys.getfilesystemencoding()
def get_list(self, name):
"""Метод для получения списка всех файлов с ftp-сервера."""
import os
for dirname in self.ftp.mlsd(str(name), facts=["type"]):
if dirname[1]["type"] == "file":
entry_file_list = {}
entry_file_list['remote_path'] = name #путь до файла
entry_file_list['filename'] = dirname[0] #имя файла
self.file_list.append(entry_file_list)
else:
path = os.path.join(name, dirname[0])
self.get_list(path)
def get_next_file(self):
return self.file_list.pop()
def len(self):
return len(self.file_list)
Помимо методов соединения с сервером, получения списка файлов и определения его длины здесь имеет метод, который возвращает нам следующий файл для скачивания, из списка он при этом, конечно, удаляется. ЛогированиеДля ведения логов скачивания будет использовать стандартную библиотеку logging. Создадим класс, который будет заниматься логированием.
class MyLogger:
"""Класс для логирования событий"""
def __init__(self):
self.logger = None
def start_file_logging(self, logger_name, log_path):
"""Обычное логирование в файл"""
import logging
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
try:
fh = logging.FileHandler(log_path)
except FileNotFoundError:
log_path = "downloader.log"
fh = logging.FileHandler(log_path)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5):
"""Логирование в файл с ротацией логов"""
import logging
from logging.handlers import RotatingFileHandler
self.logger = logging.getLogger(logger_name)
self.logger.setLevel(logging.INFO)
try:
fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
except FileNotFoundError:
log_path = "downloader.log"
fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
self.logger.addHandler(fh)
def add(self, msg):
self.logger.info(str(msg))
def add_error(self, msg):
self.logger.error(str(msg))
Скрипт будет поддерживать просто логирование в файл и ротацию файловых логов, ибо логи имеют свойство расти непомерно и это надо бы держать под контролем. Скачивание файлаКаждый файл будет скачиваться в отдельном потоке. Класс, скачивающий один конкретный файл с сервера выглядит следующим образом:
class BaseFileDownload(threading.Thread):
""" Объект для копируемого файла """
count = 0
def __init__(self, rpath, filename, log):
threading.Thread.__init__(self)
self.remote_path = rpath
self.filename = filename
self.ftp = None
self.command = None
self.currentpath = None
self.log = log
self.__class__.count += 1 # для подсчета одновременно запущенных закачек
def __del__(self):
self.__class__.count -= 1
def connect(self):
"""Метод для соединения с ftp"""
import sys
self.ftp = MyFtp()
self.ftp.connect()
self.ftp.login()
self.ftp.__class__.encoding = sys.getfilesystemencoding()
def run(self):
"""Запуск потока скачивания"""
import os
self.connect()
self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8')
self.currentpath = os.path.join(basepath, self.remote_path[3:])
self.ftp.cwd(self.remote_path)
if not os.path.exists(self.currentpath):
os.makedirs(self.currentpath, exist_ok=True)
self.host_file = os.path.join(self.currentpath, self.filename)
try:
with open(self.host_file, 'wb') as local_file:
self.log.add("Start downloading " + self.filename)
self.ftp.retrbinary(self.command + self.filename, local_file.write)
self.log.add("Downloading " + self.filename + " complete")
except ftplib.error_perm:
self.log.add_error('Perm error')
self.ftp.quit()
Для подсчета количества одновременно скачиваемых файлов мы будет использовать свойство класса count. В нём у нас будет количество существующих экземпляров класса: в конструкторе счетчик наращивается, в деструкторе, соответственно, уменьшается.Метод для запуска скачивания должен обязательно называться run - это требование библиотеки threading (не забываем её импортировать!), которую мы будем использовать для параллельного запуска нескольких процессов скачивания. При сохранении списка файлов скрипт сохраняет также путь до этого файла, этот путь мы воссоздаем при скачивании с помощью os.makedirs.Статус-файлПо завершению скачивания скрипт будет записывать в файл уведомление об этом. Это уведомление можно мониторить zabbix, чтобы понимать когда бэкап не отработал или, как сделал я - написать простого бота, чтобы периодически проверять статус. Класс для работы с этим файлов выглядит так:
class StatusFile:
"""По окончанию задачи скрипт пишет в файл уведомление о корректном выполнении."""
def __init__(self):
self.msg = ''
def setstatus(self, msg):
global statusfilepath
with open(statusfilepath, 'w') as status_file:
status_file.write(msg)
МногопоточностьНу и, наконец, сама основная функция скрипта, которая осуществляет работу с потоками скачивания:
def main():
import os
import datetime
import time
log = MyLogger()
log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) # запускаем логирование
now = datetime.datetime.today().strftime("%Y%m%d")
global basepath
basepath = os.path.join(basepath, now) # модифицируем путь, добавляя текущую дату
list_file = FileList()
list_file.connect_ftp()
list_file.get_list("..")
for i in range(list_file.len()):
flag = True
while flag: # цикл внутри которого поддерживается нужное количество одновременно запущенных загрузок
if BaseFileDownload.count < max_threads:
curfile = list_file.get_next_file()
threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log)
threadid.start()
flag = False
else:
time.sleep(20)
log.add("Downloading files complete")
statusfile = StatusFile()
statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful")
Здесь мы запускаем логирование, получаем список файлов ( он хранится в памяти). В вечном цикле while мы проверяем количество одновременно запущенных скачиваний и, при необходимости, запускаем дополнительные потоки.Исходный код целиком можно найти здесь.
===========
Источник:
habr.com
===========
Похожие новости:
- [Python, Машинное обучение, Контент-маркетинг, Искусственный интеллект, Социальные сети и сообщества] Нейросеть для раскрутки собачьего аккаунта в Инстаграм или робопёс в действии
- [Python, FPGA] Прокачиваем скрипты симуляции HDL с помощью Python и PyTest
- [Python, Машинное обучение, Искусственный интеллект] Распознавание Ворониных на фотографиях: от концепции к делу
- [Python, API, 1С-Битрикс] Как быстро получить много данных от Битрикс24 через REST API
- [Python, Программирование, Проектирование и рефакторинг, Профессиональная литература] Как определять собственные классы исключений в Python (перевод)
- [Python, Программирование, Искусственный интеллект] Constraint Programming или как решить задачу коммивояжёра, просто описав её (перевод)
- [Python, Голосовые интерфейсы] Голосовой ассистент на Python (Виталий alfa 2.0)
- [Python] Поиск известных паттернов на Forex с Python
- [Python, Big Data, Разработка под e-commerce, Управление продуктом] Как мы в СберМаркете боремся с товарами-призраками
- [Python, API, Браузеры, Веб-аналитика] Скрапинг современных веб-сайтов без headless-браузеров (перевод)
Теги для поиска: #_python, #_ftp, #_python, #_multithreading, #_python
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 20:15
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Зачем это нужно?Однажды передо мной встала задача копирования большого количества файлов с ftp-сервера. Нужно было делать бэкап. Казалось бы, что может быть проще! Но увы, ничего готового работающего так же быстро для моих условий найти не удалось.СитуацияНужно было забирать периодически пару сотен файлов с ftp-сервера под Windows. Много мелочи и несколько очень крупных по размеру файлов. Суммарно примерно на 500 Гб. Сервер представляет собой vps, расположенный довольно далеко за рубежом. Днем машина высоко нагружена, рано ночью выполняются регламентные работы, итого на скачивание часов 5 максимум. Ни одна из рассмотренных мной утилит не смогла справиться качественно и за отведенное время. Ну что ж, деваться некуда, нормальную систему резервного копирования ещё не купили, а значит ноги в руки вооружаемся редактором или IDE Python и вперёд! За приключениями!КонфигВсе параметры для скрипта вынесем в отдельный файл для удобства.Шаблон конфига: host = 'ip.ip.ip.ip'
user = 'ftpusername' passwd = 'ftppassword' basepath = '/path/to/backup/folder' # Папка, в которой будут созданы подпапки со скачанными файлами max_threads = 20 # максимальное количество одновременных процессов загрузки log_path = '\path\to\logfile' statusfilepath = '\path\to\statusfile' if __name__ == "__main__":
host = config.host user = config.user passwd = config.passwd basepath = config.basepath # Папка, в которой будут созданы подпапки со скачанными файлами max_threads = config.max_threads log_path = config.log_path statusfilepath = config.statusfilepath main() class MyFtp (ftplib.FTP):
"""Класс переопределяет стандартный, чтобы задать все параметры соединение в одном месте""" def __init__(self): self.host = host self.user = user self.passwd = passwd self.timeout = 1800 super(MyFtp, self).__init__() def connect(self): super(MyFtp, self).connect(self.host, timeout=self.timeout) def login(self): super(MyFtp, self).login(user=self.user, passwd=self.passwd) def quit(self): super(MyFtp,self).quit() class FileList:
"""Класс для работы со списком загружаемых файлов""" def __init__(self): self.ftp = None self.file_list = [] def connect_ftp(self): import sys self.ftp = MyFtp() self.ftp.connect() self.ftp.login() self.ftp.__class__.encoding = sys.getfilesystemencoding() def get_list(self, name): """Метод для получения списка всех файлов с ftp-сервера.""" import os for dirname in self.ftp.mlsd(str(name), facts=["type"]): if dirname[1]["type"] == "file": entry_file_list = {} entry_file_list['remote_path'] = name #путь до файла entry_file_list['filename'] = dirname[0] #имя файла self.file_list.append(entry_file_list) else: path = os.path.join(name, dirname[0]) self.get_list(path) def get_next_file(self): return self.file_list.pop() def len(self): return len(self.file_list) class MyLogger:
"""Класс для логирования событий""" def __init__(self): self.logger = None def start_file_logging(self, logger_name, log_path): """Обычное логирование в файл""" import logging self.logger = logging.getLogger(logger_name) self.logger.setLevel(logging.INFO) try: fh = logging.FileHandler(log_path) except FileNotFoundError: log_path = "downloader.log" fh = logging.FileHandler(log_path) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') fh.setFormatter(formatter) self.logger.addHandler(fh) def start_rotate_logging(self, logger_name, log_path, max_bytes=104857600, story_backup=5): """Логирование в файл с ротацией логов""" import logging from logging.handlers import RotatingFileHandler self.logger = logging.getLogger(logger_name) self.logger.setLevel(logging.INFO) try: fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup) except FileNotFoundError: log_path = "downloader.log" fh = RotatingFileHandler(log_path, maxBytes=max_bytes, backupCount=story_backup) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') fh.setFormatter(formatter) self.logger.addHandler(fh) def add(self, msg): self.logger.info(str(msg)) def add_error(self, msg): self.logger.error(str(msg)) class BaseFileDownload(threading.Thread):
""" Объект для копируемого файла """ count = 0 def __init__(self, rpath, filename, log): threading.Thread.__init__(self) self.remote_path = rpath self.filename = filename self.ftp = None self.command = None self.currentpath = None self.log = log self.__class__.count += 1 # для подсчета одновременно запущенных закачек def __del__(self): self.__class__.count -= 1 def connect(self): """Метод для соединения с ftp""" import sys self.ftp = MyFtp() self.ftp.connect() self.ftp.login() self.ftp.__class__.encoding = sys.getfilesystemencoding() def run(self): """Запуск потока скачивания""" import os self.connect() self.command = str(bytes('RETR ', encoding='latin-1'), encoding='utf-8') self.currentpath = os.path.join(basepath, self.remote_path[3:]) self.ftp.cwd(self.remote_path) if not os.path.exists(self.currentpath): os.makedirs(self.currentpath, exist_ok=True) self.host_file = os.path.join(self.currentpath, self.filename) try: with open(self.host_file, 'wb') as local_file: self.log.add("Start downloading " + self.filename) self.ftp.retrbinary(self.command + self.filename, local_file.write) self.log.add("Downloading " + self.filename + " complete") except ftplib.error_perm: self.log.add_error('Perm error') self.ftp.quit() class StatusFile:
"""По окончанию задачи скрипт пишет в файл уведомление о корректном выполнении.""" def __init__(self): self.msg = '' def setstatus(self, msg): global statusfilepath with open(statusfilepath, 'w') as status_file: status_file.write(msg) def main():
import os import datetime import time log = MyLogger() log.start_rotate_logging("DownloaderLog", os.path.join(log_path, "download_backup.log")) # запускаем логирование now = datetime.datetime.today().strftime("%Y%m%d") global basepath basepath = os.path.join(basepath, now) # модифицируем путь, добавляя текущую дату list_file = FileList() list_file.connect_ftp() list_file.get_list("..") for i in range(list_file.len()): flag = True while flag: # цикл внутри которого поддерживается нужное количество одновременно запущенных загрузок if BaseFileDownload.count < max_threads: curfile = list_file.get_next_file() threadid = BaseFileDownload(curfile["remote_path"], curfile["filename"], log) threadid.start() flag = False else: time.sleep(20) log.add("Downloading files complete") statusfile = StatusFile() statusfile.setstatus("Downloading at " + str(datetime.datetime.now()) + " finishing successful") =========== Источник: habr.com =========== Похожие новости:
|
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 20:15
Часовой пояс: UTC + 5