[Python] Простой P2P сервер на python

Автор Сообщение
news_bot ®

Стаж: 6 лет 9 месяцев
Сообщений: 27286

Создавать темы news_bot ® написал(а)
06-Авг-2020 17:32


Одноранговая сеть или проще P2P сеть — это сеть в которой все пользователи равны и имеют равные права. Отличительная особенность таких сетей от обычных в том, что в ней нет единого сервера, к которому подключаются пользователи, вместо этого они подключаются друг к другу. Существуют гибридные варианты таких сетей, в котором присутствует сервер, выполняющий только координирующую работу.
Сегодня я хочу предложить простой вариант реализации P2P сервера для такой сети на языке python.
Предыстория
На 1-ом курсе обучения в вузе мне преподаватель по программированию предложил написать мне децентрализованный чат. Язык и модули я выбирал сам. Такое предложение меня сразу заинтересовало, тем более я давно хотел начать изучать python, да и мог не посещать её пары. Недолго думая я согласился и принялся к работе.
Самым трудным для меня оказалось написать серверную часть. Если интерфейс я написал практически сразу, на него я потратил около порядка 2 дней, то над серверной частью мне пришлось подумать. Думал я неделю, но зато на следующий день за пару часов написал рабочий P2P сервер для своего чата.
Сервер
Исходный код всего сервера расположен ниже.

Импорты

SPL
import socket
import rsa
from threading import Thread
from time import sleep
import datetime

Сервер

SPL
# P2P сервер
class P2P:
    def __init__(self, _port: int, _max_clients: int = 1):
        # Индикатор работы сервера
        self.running = True
        # Порт сервера
        self.port = _port
        # Максимальное кол-во подключений
        self.max_clients = _max_clients
        # Подключённые пользователи
        self.clients_ip = ["" for i in range(self.max_clients)]
        # Словарь с входящими сообщениями
        self.incoming_requests = {}
        # Логи клиентов
        self.clients_logs = [Log for i in range(self.max_clients)]
        # Клиентские соккеты
        self.client_sockets = [socket.socket() for i in range(self.max_clients)]
        # Таймауты клиентов
        for i in self.client_sockets:
            i.settimeout(0.2)
        # Ключи для шифрования исходящих сообщений
        self.keys = [rsa.key.PublicKey for i in range(self.max_clients)]
        # Ключи для дешифрования входящих сообщений
        self.my_keys = [rsa.key.PrivateKey for i in range(self.max_clients)]
        # Информация загруженности соккетов
        self.socket_busy = [False for i in range(self.max_clients)]
        # Чёрный список
        self.blacklist = ["127.0.0.1"] + Log.read_and_return_list("blacklist.txt")
        # Серверный соккет
        self.server_socket = socket.socket()
        # Таймаут сервера
        self.server_socket.settimeout(0.2)
        # Бинд сервера
        self.server_socket.bind(('localhost', _port))
        self.server_socket.listen(self.max_clients)
        self.log = Log("server.log")
        self.log.save_data("Server initialized")
    # server control
    # Создаёт сессию с этим пользователем
    def create_session(self, _address: str):
        self.log.save_data("Creating session with {}".format(_address))
        ind = self.__get_free_socket()
        if _address in self.blacklist:
            self.log.save_data("{} in blacklist".format(_address))
            return
        if ind is None:
            self.log.save_data("All sockets are busy, can`t connect to {}".format(_address))
            return
        try:
            self.__add_user(_address)
            thread = Thread(target=self.__connect, args=(_address, 1))
            thread.start()
            thread.join(0)
            connection, address = self.server_socket.accept()
            connection.settimeout(0.2)
        except OSError:
            self.log.save_data("Failed to create session with {}".format(_address))
            self.__del_user(_address)
            return
        my_key = rsa.newkeys(512)
        self.raw_send(_address, my_key[0].save_pkcs1())
        key = connection.recv(162).decode()
        self.clients_logs[ind].save_data("from {}: {}".format(_address, key))
        key = rsa.PublicKey.load_pkcs1(key)
        self.__add_keys(_address, key, my_key[1])
        while self.running and self.socket_busy[ind]:
            try:
                data = connection.recv(2048)
            except socket.timeout:
                continue
            except OSError:
                self.close_connection(_address)
                return
            if data:
                data = rsa.decrypt(data, self.my_keys[ind])
                self.__add_request(_address, data)
        try:
            self.close_connection(_address)
        except TypeError or KeyError:
            pass
    # Подключается к пользователю
    def __connect(self, _address: str, *args):
        ind = self.__get_ind_by_address(_address)
        try:
            self.client_sockets[ind].connect((_address, self.port))
            self.socket_busy[ind] = True
            return True
        except OSError:
            return False
    # Перезагружает соккет
    def __reload_socket(self, _ind: int):
        self.client_sockets[_ind].close()
        self.client_sockets[_ind] = socket.socket()
        self.socket_busy[_ind] = False
    # Закрывает соединение
    def close_connection(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.__del_key(_address)
        self.__reload_socket(ind)
        self.__del_user(_address)
    # Останавливает сервер
    def kill_server(self):
        self.running = False
        sleep(1)
        self.server_socket.close()
        self.log.kill_log()
        for i in self.client_sockets:
            i.close()
        for i in self.clients_logs:
            try:
                i.kill_log()
            except TypeError:
                pass
    # Отправляет сообщение с шифрованием
    def send(self, _address: str, _message: str):
        ind = self.__get_ind_by_address(_address)
        try:
            self.clients_logs[ind].save_data("to {}: {}".format(_address, _message))
            self.client_sockets[ind].send(rsa.encrypt(_message.encode(), self.keys[ind]))
            self.log.save_data("Send message to {}".format(_address))
        except OSError:
            self.log.save_data("Can`t send message to {}".format(_address))
    # Отправляет сообщение без шифрования
    def raw_send(self, _address: str, _message: bytes):
        ind = self.__get_ind_by_address(_address)
        try:
            self.client_sockets[ind].send(_message)
            self.clients_logs[ind].save_data("to {}: {}".format(_address, _message))
            self.log.save_data("Raw send message to {}".format(_address))
        except OSError:
            self.log.save_data("Raw send to {} Failed".format(_address))
    # add
    # Добавляет пользователя
    def __add_user(self, _address: str):
        ind = self.__get_free_socket()
        self.clients_logs[ind] = Log("{}.log".format(_address))
        self.clients_ip[ind] = _address
        self.incoming_requests[_address] = []
        self.log.save_data("Added user {}".format(_address))
    # Добавляет ключ для шифрования и дешифрования адресу
    def __add_keys(self, _address: str, _key: rsa.key.PublicKey, _my_key: rsa.key.PrivateKey):
        ind = self.__get_ind_by_address(_address)
        try:
            self.keys[ind] = _key
            self.my_keys[ind] = _my_key
        except TypeError:
            return
    # Добавляет входящее сообщение от адреса
    def __add_request(self, _address: str, _message: bytes):
        self.incoming_requests[_address].append(_message.decode())
        self.clients_logs[self.__get_ind_by_address(_address)].save_data("from {}: {}".format(_address, str(_message)))
        self.log.save_data("Get incoming message from {}".format(_address))
    # get
    # Возвращает индекс первого свободного соккета
    # if self.__get_free_socket() is not None: *
    def __get_free_socket(self):
        for i in range(len(self.socket_busy)):
            if not self.socket_busy[i]:
                return i
        return None
    # Возвращает номер индекса, к которому подключён адрес
    def __get_ind_by_address(self, _address: str):
        for i in range(len(self.clients_ip)):
            if self.clients_ip[i] == _address:
                return i
        else:
            return None
    # Возвращает входящее сообщение от адреса
    def get_request(self, _address: str):
        data = self.incoming_requests[_address][0]
        self.incoming_requests[_address] = [self.incoming_requests[_address][i]
                                            for i in range(1, len(self.incoming_requests[_address]))]
        return data
    # check
    # Проверяет наличие входящих сообщения от пользователя
    # if self.check_request(_address): *
    def check_request(self, _address: str):
        return bool(self.incoming_requests.get(_address))
    # return True if you already connected to _address else False
    def check_address(self, _address: str):
        return True if _address in self.clients_ip else False
    # del
    # Удаляет пользователя
    def __del_user(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.clients_logs[ind].kill_log()
        self.clients_logs[ind] = Log
        self.clients_ip[ind] = ""
        self.incoming_requests.pop(_address)
        self.log.save_data("Deleted user {}".format(_address))
    # Удаляет пользователя
    def __del_key(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.keys[ind] = rsa.key.PublicKey
        self.my_keys[ind] = rsa.key.PrivateKey
    # others
    # Возвращает число подключённых пользователей
    def __len__(self):
        num = 0
        for i in self.clients_ip:
            if i != "":
                num += 1
        return num
    # возвращает Правду если есть хотя бы одно подключение
    def __bool__(self):
        for i in self.clients_ip:
            if i != "":
                return True
        return False

Лог

SPL
class Log:
    def __init__(self, _name: str):
        self.name = _name
        try:
            self.file = open(_name, "a")
        except FileNotFoundError:
            self.file = open(_name, "w")
        self.save_data("Log started at " + str(datetime.datetime.now()))
        self.file.close()
    # Сохраняет информацию в файл
    def save_data(self, _data: str):
        self.file = open(self.name, "a")
        self.file.write("{}\n".format(_data))
        self.file.close()
    # Возвращает данные из файла в виде листа
    @staticmethod
    def read_and_return_list(_name: str):
        try:
            file = open(_name, "r")
        except FileNotFoundError:
            return []
        data = file.read()
        return data.split("\n")
    # Останавливает лог
    def kill_log(self):
        self.file = open(self.name, "a")
        self.save_data("Log stopped at {}\n".format(datetime.datetime.now()))
        self.file.close()

А теперь приступим к разбору и объяснению. Все серверные функции мы разделим на условные категории в зависимости от того, что они делают:
  • инициализация
  • add функции
  • del функции
  • check функции
  • get функции
  • server control функции
  • Другие функции

Инициализация

init

SPL
def __init__(self, _port: int, _max_clients: int = 1):
        # Индикатор работы сервера
        self.running = True
        # Порт сервера
        self.port = _port
        # Максимальное кол-во подключений
        self.max_clients = _max_clients
        # Подключённые пользователи
        self.clients_ip = ["" for i in range(self.max_clients)]
        # Словарь с входящими сообщениями
        self.incoming_requests = {}
        # Логи клиентов
        self.clients_logs = [Log for i in range(self.max_clients)]
        # Клиентские соккеты
        self.client_sockets = [socket.socket() for i in range(self.max_clients)]
        # Таймауты клиентов
        for i in self.client_sockets:
            i.settimeout(0.2)
        # Ключи для шифрования исходящих сообщений
        self.keys = [rsa.key.PublicKey for i in range(self.max_clients)]
        # Ключи для дешифрования входящих сообщений
        self.my_keys = [rsa.key.PrivateKey for i in range(self.max_clients)]
        # Информация загруженности соккетов
        self.socket_busy = [False for i in range(self.max_clients)]
        # Чёрный список
        self.blacklist = ["127.0.0.1"] + Log.read_and_return_list("blacklist.txt")
        # Серверный соккет
        self.server_socket = socket.socket()
        # Таймаут сервера
        self.server_socket.settimeout(0.2)
        # Бинд сервера
        self.server_socket.bind(('localhost', _port))
        self.server_socket.listen(self.max_clients)
        self.log = Log("server.log")
        self.log.save_data("Server initialized")

Для инициализации сервера запросим порт, на котором будем запускать сервер и максимальное кол-во подключений, по умолчанию 1. Сам сервер будет хранить такие данные:
  • Индикатор работы
  • Порт
  • Максимальное кол-во соединений

Листы с длинной равной максимальному количеству пользователей:
  • Ip подключённых клиентов
  • Клиентские соккеты
  • Ключи для шифрования
  • Ключи для дешифрования
  • Индикатор загруженности соккетов

Также объявим чёрный список адресов, который будет загружаться из файла и постоянно содержать адрес "127.0.0.1" во избежание "двойного подключения" к себе самому ( localhost всё ещё доступен), и словарь, который будет хранить входящие сообщения. И нужно установить серверному соккету максимальное кол-во подключений командой listen().
add функции
Все функции в этой категории будут работать только внутри класса. Неправильное обращение с ними может вызвать неправильную работу сервера.
Функция add_user добавляет указанный адрес в рабочие листы сервера, а также запускает лог диалога с пользователем.

add_user

SPL
def __add_user(self, _address: str):
        ind = self.__get_free_socket()
        self.clients_logs[ind] = Log("{}.log".format(_address))
        self.clients_ip[ind] = _address
        self.incoming_requests[_address] = []
        self.log.save_data("Added user {}".format(_address))

Функция add_keys добавляет ключи для шифрования и дешифрования указанному адресу.

add_keys

SPL
def __add_keys(self, _address: str, _key: rsa.key.PublicKey, _my_key: rsa.key.PrivateKey):
        ind = self.__get_ind_by_address(_address)
        try:
            self.keys[ind] = _key
            self.my_keys[ind] = _my_key
        except TypeError:
            return

И последняя функция add_request добавляет в словарь входящих сообщений сообщение от указанного адреса.

add_request

SPL
def __add_request(self, _address: str, _message: bytes):
        self.incoming_requests[_address].append(_message.decode())
        self.clients_logs[self.__get_ind_by_address(_address)].save_data("from {}: {}".format(_address, str(_message)))
        self.log.save_data("Get incoming message from {}".format(_address))

del функции
Эти функции также как и прошлые работают с внутренними данными сервера. Но они, в отличии от прошлых, удаляют данные а не добавляют.
Функция del_user противоположна функции add_user. Она удаляет всё, что связано с указанным адресом с сервером, а также закрывает лог.

del_user

SPL
def __del_user(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.clients_logs[ind].kill_log()
        self.clients_logs[ind] = Log
        self.clients_ip[ind] = ""
        self.incoming_requests.pop(_address)
        self.log.save_data("Deleted user {}".format(_address))

Функция del_key удаляет ключи для шифрования и дешифрования указанного адреса.

del_key

SPL
def __del_key(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.keys[ind] = rsa.key.PublicKey
        self.my_keys[ind] = rsa.key.PrivateKey

check функции
Эти функции направлены на получение информации о данных на сервере.
Функция check_request проверяет наличие входящих сообщение от указанного адреса и возвращает его в виде True при наличии или False при отсутствии.

check_request

SPL
def check_request(self, _address: str):
        return bool(self.incoming_requests.get(_address))

Функции check_address проверяет есть ли указанный адрес среди подключённых пользователей или нет и возвращает True, если он есть или False, если его нет.

check_address

SPL
def check_address(self, _address: str):
        return True if _address in self.clients_ip else False

get функции
Функция get_free_socket только для внутренней работы сервера и возвращает индекс свободного соккета, если такие есть, иначе ничего.

get_free_socket

SPL
def __get_free_socket(self):
        for i in range(len(self.socket_busy)):
            if not self.socket_busy[i]:
                return i
        return None

Функция get_ind_by_address тоже только для внутренней работы, она возвращает номер соккета, к которому подключён данный адрес или ничего, если адрес никуда не подключён.

get_ind_by_address

SPL
def __get_ind_by_address(self, _address: str):
        for i in range(len(self.clients_ip)):
            if self.clients_ip[i] == _address:
                return i
        else:
            return None

И последняя функция get_request возвращает первое сообщение от указанного адреса и удаляет его из сервера. Она выкинет ошибку, если сообщений нет вообще.

get_request

SPL
def get_request(self, _address: str):
        data = self.incoming_requests[_address][0]
        self.incoming_requests[_address] = [self.incoming_requests[_address][i]
                                            for i in range(1, len(self.incoming_requests[_address]))]
        return data

server control функции
Это основные функции работы сервера, в них заключается логика работы сервера.
Одна из самых важных функций — create_session — она устанавливает соединение с указанным адресом. Здесь осуществляется проверка наличия адреса в в чёрном списке, загруженность соккетов, осуществляется обмен ключами шифрования при успешном подключении и запускается цикл прослушивания соккета, который получает сообщени я и работает с ними.

create_session

SPL
def create_session(self, _address: str):
        self.log.save_data("Creating session with {}".format(_address))
        ind = self.__get_free_socket()
        if _address in self.blacklist:
            self.log.save_data("{} in blacklist".format(_address))
            return
        if ind is None:
            self.log.save_data("All sockets are busy, can`t connect to {}".format(_address))
            return
        try:
            self.__add_user(_address)
            thread = Thread(target=self.__connect, args=(_address, 1))
            thread.start()
            thread.join(0)
            connection, address = self.server_socket.accept()
            connection.settimeout(0.2)
        except OSError:
            self.log.save_data("Failed to create session with {}".format(_address))
            self.__del_user(_address)
            return
        my_key = rsa.newkeys(512)
        self.raw_send(_address, my_key[0].save_pkcs1())
        key = connection.recv(162).decode()
        self.clients_logs[ind].save_data("from {}: {}".format(_address, key))
        key = rsa.PublicKey.load_pkcs1(key)
        self.__add_keys(_address, key, my_key[1])
        while self.running and self.socket_busy[ind]:
            try:
                data = connection.recv(2048)
            except socket.timeout:
                continue
            except OSError:
                self.close_connection(_address)
                return
            if data:
                data = rsa.decrypt(data, self.my_keys[ind])
                self.__add_request(_address, data)
        try:
            self.close_connection(_address)
        except TypeError or KeyError:
            pass

Функция connect осуществляет подключение к пользователю с указанным адресом и возвращает True при успехе или False при неудаче. Использовать её стоит только внутри сервера.

connect

SPL
def __connect(self, _address: str, *args):
        ind = self.__get_ind_by_address(_address)
        try:
            self.client_sockets[ind].connect((_address, self.port))
            self.socket_busy[ind] = True
            return True
        except OSError:
            return False

Функция close_connection закрывает соединение с указанным адресом.

close_connection

SPL
def close_connection(self, _address: str):
        ind = self.__get_ind_by_address(_address)
        self.__del_key(_address)
        self.__reload_socket(ind)
        self.__del_user(_address)

Функция kill_server полностью выключает сервер.

kill_server

SPL
def kill_server(self):
        self.running = False
        sleep(1)
        self.server_socket.close()
        self.log.kill_log()
        for i in self.client_sockets:
            i.close()
        for i in self.clients_logs:
            try:
                i.kill_log()
            except TypeError:
                pass

И последняя функция reload_socket, предназначенная для использования внутри самого сервера, перезагружает соккет с указанным индексом.

reload_socket

SPL
def __reload_socket(self, _ind: int):
        self.client_sockets[_ind].close()
        self.client_sockets[_ind] = socket.socket()
        self.socket_busy[_ind] = False

Другие функции
Функция bool возвращает True, если есть хоть какое-нибудь подключение, или False, если таких нет.

bool

SPL
def __bool__(self):
        for i in self.clients_ip:
            if i != "":
                return True
        return False

Функция len возвращает количество подключённых к серверу клиентов.

len

SPL
def __len__(self):
        num = 0
        for i in self.clients_ip:
            if i != "":
                num += 1
        return num

Лог
Также стоить написать небольшой лог для сервера, который будет документировать процесс работы сервера и процесс обмена сообщения между пользователями. Стоит сказать, что открытие и закрытие файла при каждой записи необходимо в данном случае. Так как при вылете сервера или программы, где он задействован может потребоваться проверить лог работы сервера и данные сохранятся только в таком случае.
А теперь разберём функции на пальцах, тем более здесь ничего сложного нет.
Для инициализации сервера потребуется только имя файла. Сначала мы попробуем открыть файл на до запись, но если такого файл нет, то создаст его. Сразу же в файл запишем время старта лога.

init

SPL
def __init__(self, _name: str):
        self.name = _name
        try:
            self.file = open(_name, "a")
        except FileNotFoundError:
            self.file = open(_name, "w")
        self.save_data("Log started at " + str(datetime.datetime.now()))
        self.file.close()

Функция save_data сохраняет в файл указанное сообщение.

save_data

SPL
def save_data(self, _data: str):
        self.file = open(self.name, "a")
        self.file.write("{}\n".format(_data))
        self.file.close()

Статическая функция read_and_return_list не требует объекта класса для использования, но требует для своей работы имя файла из которого будет взята вся информация и возвращена в виде листа.

read_and_return_list

SPL
@staticmethod
    def read_and_return_list(_name: str):
        try:
            file = open(_name, "r")
        except FileNotFoundError:
            return []
        data = file.read()
        return data.split("\n")

И последняя функция kill_log записывает в файл время остановки лога и закрывает файл.

kill_log

SPL
def kill_log(self):
        self.file = open(self.name, "a")
        self.save_data("Log stopped at {}\n".format(datetime.datetime.now()))
        self.file.close()

Заключение
Написать сервер для одноранговой сети не сложно, но, с другой стороны, есть куда двигаться. Можно реализовать отправку файлов и сообщений, скачивание их по частям сразу от нескольких пользователей. Можно усовершенствовать лог или добавить шифрование на отправку ключей для шифрования и дешифрования.
Если есть какие-либо предложения по этому поводу или варианты улучшения кода буду рад почитать в комментариях.
===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_python, #_chat (Чат), #_python, #_python_class, #_socket, #_python
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 22-Ноя 18:15
Часовой пояс: UTC + 5