[Информационная безопасность, Обработка изображений] Сим-сим откройся: как я научил дверь своего подъезда узнавать меня в лицо

Автор Сообщение
news_bot ®

Стаж: 6 лет 9 месяцев
Сообщений: 27286

Создавать темы news_bot ® написал(а)
07-Июн-2021 20:34

Пятничный рабочий день на удалёнке уже подходил к концу, как в дверь постучали, чтобы сообщить об установке нового домофона. Узнав, что новый домофон имеет мобильное приложение, позволяющее отвечать на звонки не находясь дома, я заинтересовался и сразу же загрузил его на свой телефон. Залогинившись, я обнаружил интересную особенность этого приложения — даже без активного вызова в мою квартиру я мог смотреть в камеру домофона и открывать дверь в произвольный момент времени. "Да это же онлайн АРI к двери подъезда!" — щёлкнуло в голове. Судьба предстоящих выходных была предрешена.Видеодемонстрация в конце статьи.
Кадр из фильма «Пятый Элемент»ДисклеймерИспользуемая технология аутентификации на основании распознавания лиц не учитывает множества сценариев атак и недостаточна для обеспечения безопасности. Описанная система анализа кадров была активна только в моменты, когда кому-то из моей семьи было необходимо попасть в подъезд — она не совершала автоматическую обработку лиц других людей и не могла привести к допуску в подъезд третьих лиц.Получение APIЧтобы автоматизировать управление дверью, необходимо понять, куда и в каком формате отправляет запросы само приложение. Реверс-инжиниринг — дело неоднозначное, поэтому я попытался обойтись без него и вместо этого просто перехватить свой собственный трафик. Для этой задачи я взял НТТР Тооlkit — комплекс программ, который позволяет наладить прослушивание http(s) запросов собственного Android устройства.Первая попытка оказывается провальной — после установки на телефон Android-части инструментария и сгенерированного Certificate authority оказалось, что мобильное приложение домофона не доверяет пользовательским СА. Согласно документации, начиная с Android 7 манифест приложения должен явно изъявлять такое желание.Так как мой телефон не поддерживает root доступ для модификации списка системных СА, я воспользовался официальным эмулятором Android, идущим в комплекте с Android Studio. После запуска эмулятора и перехвата с помощью ADB меня встретило радостное сообщение о том, что трафик от всех приложений без Certificate pinning будет успешно расшифрован.
Успешное соединение с HTTP ToolkitК счастью, приложение оказалось как раз из таких — немного побродив по приложению и открыв дверь, можно переходить к анализу логов.
Запрос открытия двериВсего интересными показалось три запроса:
  • Запрос на открытие двери подъезда: POST по адресу /rest/v1/places/{place_id}/accesscontrols/{control_id}/actions с JSON-телом {"name": "accessControlOpen"}
  • Получение снимка (превью) с камеры: GET по адресу /rest/v1/places/{place_id}/accesscontrols/{control_id}/videosnapshots
  • Получение ссылки на видеопоток с аудио: GET по адресу /rest/v1/forpost/cameras/{camera_id}/video?LightStream=0
HTTP заголовки всех трёх запросов содержат ключ Authorization — судя по всему, именно по нему происходит авторизация при выполнении запросов. Сделав пару запросов руками через Advanced REST Client, чтобы убедиться, что заголовка Authorization достаточно и в самостоятельной работе с API не осталось подводных камней, я понял, что можно откладывать эмулятор и переходить к написанию кода.Вооружившись Python и requests, я написал необходимые для следующих этапов функции исполнения каждого из трёх действий:
HEADERS = {"Authorization": "Bearer ###"}
ACTION_URL = "https://###.ru/rest/v1/places/###/accesscontrols/###/"
VIDEO_URL = "https://###.ru/rest/v1/forpost/cameras/###/video?LightStream=0"
def get_image():
    result = requests.get(f'{ACTION_URL}/videosnapshots', headers=HEADERS)
    if result.status_code != 200:
        logging.error(f"Failed to get an image with status code {result.status_code}")
        return None
    logging.warning(f"Image received successfully in {result.elapsed.total_seconds()}sec")
    return result.content
def open_door():
    result = requests.post(
        f'{ACTION_URL}/actions', headers=HEADERS, json={"name": "accessControlOpen"})
    if result.status_code != 200:
        logging.error(f"Failed to open the door with status code {result.status_code}")
        return False
    logging.warning(f"Door opened successfully in {result.elapsed.total_seconds()}sec")
    return True
def get_videostream_link():
    result = requests.get(VIDEO_URL, headers=HEADERS)
    if result.status_code != 200:
        logging.error(f"Failed to get stream link with status code {result.status_code}")
        return False
    logging.warning(f"Stream link received successfully in {result.elapsed.total_seconds()}sec")
    return result.json()['data']['URL']
Поиск знакомых лиц в кадреПрежде чем начать этот раздел, нужно рассказать пару слов об уже имеющихся на тот момент у меня в распоряжении серверных мощностях — это недорогая виртуальная машина с доступом ко всего одному потоку Intel(R) Xeon(R) CPU E5-2650L v3 @ 1.80GHz, 1GB оперативной памяти и 0 GPU. Тратиться на более дорогую конфигурацию не хотелось, а значит, нужно было попробовать выжать максимум из имеющихся ресурсов.На данном этапе проектом заинтересовалась моя жена, хорошо разбирающаяся в машинном обучении и в итоге взявшая на себя эту задачу. Для этой работы был выбран OpenVINO Toolkit — инструментарий от Intel, в том числе заточенный как раз на запуск моделей машинного обучения на CPU.Непродолжительный поиск существующих решений привёл на страницу Interactive Face Recognition Demo — официального демо, показывающего ровно необходимый функционал сравнения видимых в кадре лиц с базой заранее сохранённых. Единственная проблема состояла в том, что данный пример по каким-то причинам исчез после релиза 2020.3, а удобная установка пакета через pip у проекта появилась только с 2021.1. Было решено установить последнюю версию OpenVINO и адаптировать код под неё.К счастью, гит помнит всё и получить из репозитория проекта код демо и нужные модели не составило труда. После удаления всей работы с командной строкой (взяв для всего значения по умолчанию), визуализацией и видеопотоком вебкамеры, был получен класс, способный распознавать лица в конкретном кадре:
class ImageProcessor:
    def __init__(self):
        self.frame_processor = FrameProcessor()
    def process(self, image):
        detections = self.frame_processor.process(image)
        labels = []
        for roi, landmarks, identity in zip(*detections):
            label = self.frame_processor.face_identifier.get_identity_label(
                identity.id)
            labels.append(label)
        return labels
После создания базы лиц из десятка селфи можно было переходить к тестированию фактической производительности полученного решения на имеющемся железе. В качестве подопытных картинок я взял фотографию себя и фото пустой улицы, сделанные на домофонную камеру функцией get_image() выше.
100 runs on an image with known face:
Total time: 7.356s
Time per frame: 0.007s
FPS: 135.944
100 runs on an image without faces:
Total time: 2.985s
Time per frame: 0.003s
FPS: 334.962
Очевидно, что показанная производительность с запасом покрывает нужды детектирования лиц в реальном времени.1 FPS: Работа со снимками с камерыИтак, на этом этапе у меня уже был класс для поиска заданных лиц в кадре, а также функции получения снимка с камеры и ссылки на видеопоток. С видео на тот момент я никогда не работал, потому для MVP решил переложить работу по выделению кадров на сервер и снова воспользоваться get_image().
class ImageProcessor:
    # <...>
    def process_single_image(self, image):
        nparr = np.fromstring(image, np.uint8)
        img_np = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
        labels = self.process(img_np)
        return labels
def snapshot_based_intercom_id():
    processor = ImageProcessor()
    last_open_door_time = time.time()
    while True:
        start_time = time.time()
        image = get_image()
        result = processor.process_single_image(image)
        logging.info(f'{result} in {time.time() - start_time}s')
        # Successfull detections are "face{N}"
        if any(['face' in res for res in result]):
            if start_time - last_open_door_time > 5:
                open_door()
                with open(f'images/{start_time}_OK.jfif', 'wb') as f:
                    f.write(image)
                last_open_door_time = start_time
Цикл получает картинки от домофона, ищет на них знакомые лица, открывает дверь в случае успешной детекции и сохраняет фото на память. Важно было не забыть ограничить частоту отправки команды открытия двери, т.к. лицо обычно распознаётся ещё на подходах к двери.
Момент успешного распознавания, версия с обработкой отдельных снимковЗаработало! Полностью довольный первым запуском, я вернулся в квартиру. Единственное, что портило впечатление от системы распознавания — время реакции на появление лица в кадре, т.к. время отклика API оставляло желать лучшего. Низкая частота поступления данных, 0.7с на получение картинки и 0.6с на открытие двери, давали видимый невооружённым взглядом лаг.До 30 FPS: Обработка видеопотокаПолучить кадры из видео оказалось на удивление просто:
vcap = cv2.VideoCapture(link)
success, frame = vcap.read()
Замеры показали, что камера домофона способна выдавать стабильные 30 FPS. Проблемой оказалось поддержание меньшей частоты кадров: метод read() возвращает последний необработанный кадр из внутренней очереди кадров. Если эта очередь наполняется видеопотоком быстрее, чем вычитывается, то обрабатываемые кадры начинают отставать от реального времени, что приводит к нежелательной задержке. Дополнительно, с практической точки зрения, распознавать лица 30 раз в секунду — пустая трата вычислительных ресурсов, так как обычно люди подходят к двери подъезда с небольшой скоростью.Первым потенциальным решением было установить размер внутренней очереди: vcap.set(CV_CAP_PROP_BUFFERSIZE, 0);. Согласно найденной информации, такой трюк должен был хорошо работать с любой конфигурацией системы для версий OpenCV выше 3.4, но по какой-то причине, так и не оказал никакого влияния в моём случае. Единственной рабочей альтернативой стал подход, описанный в этом ответе со StackOverflow — завести отдельный поток, читающий кадры из камеры на максимально возможной скорости и сохраняющий последний в поле класса для дальнейшего доступа (впоследствии оказалось, что именно этот цикл ответственен за большую часть потребления процессора).Получилась модификация ImageProcessor для обработки видеопотока с частотой 3 кадра в секунду:
class CameraBufferCleanerThread(threading.Thread):
    def __init__(self, camera, name='camera-buffer-cleaner-thread'):
        self.camera = camera
        self.last_frame = None
        self.finished = False
        super(CameraBufferCleanerThread, self).__init__(name=name)
        self.start()
    def run(self):
        while not self.finished:
            ret, self.last_frame = self.camera.read()
    def __enter__(self): return self
    def __exit__(self, type, value, traceback):
        self.finished = True
        self.join()
class ImageProcessor:
    # <...>
    def process_stream(self, link):
        vcap = cv2.VideoCapture(link)
        interval = 0.3 # ~3 FPS
        with CameraBufferCleanerThread(vcap) as cam_cleaner:
            while True:
                frame = cam_cleaner.last_frame
                if frame is not None:
                    yield (self.process(frame), frame)
                else:
                    yield (None, None)
                time.sleep(interval)
И соответствующая модификация snapshot_based_intercom_id:
def stream_based_intercom_id():
    processor = ImageProcessor()
    link = get_videostream_link()
    # To notify about delays
    last_time = time.time()
    last_open_door_time = time.time()
    for result, np_image in processor.process_stream(link):
        current_time = time.time()
        delta_time = current_time - last_time
        if delta_time < 1:
            logging.info(f'{result} in {delta_time}')
        else:
            logging.warning(f'{result} in {delta_time}')
        last_time = current_time
        if result is None:
            continue
        if any(['face' in res for res in result]):
            if current_time - last_open_door_time > 5:
                logging.warning(
                    f'Hey, I know you - {result[0]}! Opening the door...')
                last_open_door_time = current_time
                open_door()
                cv2.imwrite(f'images/{current_time}_OK.jpg', np_image)
Тест в реальных условиях показал ощутимое падение времени отклика — при хорошем освещении подъездная дверь оказывалась открытой ещё до того, как я успевал подойти к ней вплотную.
Момент успешного распознавания, версия с обработкой видеопотокаУправление с помощью Telegram ботаСама система доказала свою работоспособность и теперь хотелось создать для неё удобный интерфейс для включения/выключения. Телеграм бот показался отличным вариантом решения для этой задачи.С помощью пакета python-telegram-bot была написана простая оболочка, принимающая в себя callback включения/выключения системы и список доверенных никнеймов.
class TelegramInterface:
    def __init__(self, login_whitelist, state_callback):
        self.state_callback = state_callback
        self.login_whitelist = login_whitelist
        self.updater = Updater(
            token = "###", use_context = True)
        self.run()
    def run(self):
        dispatcher = self.updater.dispatcher
        dispatcher.add_handler(CommandHandler("start", self.start))
        dispatcher.add_handler(CommandHandler("run", self.run_intercom))
        dispatcher.add_handler(CommandHandler("stop", self.stop_intercom))
        self.updater.start_polling()
    def run_intercom(self, update: Update, context: CallbackContext):
        user = update.message.from_user
        update.message.reply_text(
            self.state_callback(True) if user.username in self.login_whitelist else 'not allowed',
            reply_to_message_id=update.message.message_id)
    def stop_intercom(self, update: Update, context: CallbackContext):
        user = update.message.from_user
        update.message.reply_text(
            self.state_callback(False) if user.username in self.login_whitelist else 'not allowed',
            reply_to_message_id=update.message.message_id)
    def start(self, update: Update, context: CallbackContext) -> None:
        update.message.reply_text('Hi!')
class TelegramBotThreadWrapper(threading.Thread):
    def __init__(self, state_callback, name='telegram-bot-wrapper'):
        self.whitelist = ["###", "###"]
        self.state_callback = state_callback
        super(TelegramBotThreadWrapper, self).__init__(name=name)
        self.start()
    def run(self):
        self.bot = TelegramInterface(self.whitelist, self.state_callback)
И следующая ступень эволюции функции intercom_id, обрабатывающая синхронизацию между потоками обработки данных и получения команд от бота:
def stream_based_intercom_id_with_telegram():
    processor = ImageProcessor()
    loop_state_lock = threading.Lock()
    loop_should_run = False
    loop_should_change_state_cv = threading.Condition(loop_state_lock)
    is_loop_finished = True
    loop_changed_state_cv = threading.Condition(loop_state_lock)
    def stream_processing_loop():
        nonlocal loop_should_run
        nonlocal loop_should_change_state_cv
        nonlocal is_loop_finished
        nonlocal loop_changed_state_cv
        while True:
            with loop_should_change_state_cv:
                loop_should_change_state_cv.wait_for(lambda: loop_should_run)
                is_loop_finished = False
                loop_changed_state_cv.notify_all()
                logging.warning(f'Loop is started')
            link = get_videostream_link()
            last_time = time.time()
            last_open_door_time = time.time()
            for result, np_image in processor.process_stream(link):
                with loop_should_change_state_cv:
                    if not loop_should_run:
                        is_loop_finished = True
                        loop_changed_state_cv.notify_all()
                        logging.warning(f'Loop is stopped')
                        break
                current_time = time.time()
                delta_time = current_time - last_time
                if delta_time < 1:
                    logging.info(f'{result} in {delta_time}')
                else:
                    logging.warning(f'{result} in {delta_time}')
                last_time = current_time
                if result is None:
                    continue
                if any(['face' in res for res in result]):
                    if current_time - last_open_door_time > 5:
                        logging.warning(f'Hey, I know you - {result[0]}! Opening the door...')
                        last_open_door_time = current_time
                        open_door()
                        cv2.imwrite(f'images/{current_time}_OK.jpg', np_image)
    def state_callback(is_running):
        nonlocal loop_should_run
        nonlocal loop_should_change_state_cv
        nonlocal is_loop_finished
        nonlocal loop_changed_state_cv
        with loop_should_change_state_cv:
            if is_running == loop_should_run:
                return "Intercom service state is not changed"
            loop_should_run = is_running
            if loop_should_run:
                loop_should_change_state_cv.notify_all()
                loop_changed_state_cv.wait_for(lambda: not is_loop_finished)
                return "Intercom service is up"
            else:
                loop_changed_state_cv.wait_for(lambda: is_loop_finished)
                return "Intercom service is down"
    telegram_bot = TelegramBotThreadWrapper(state_callback)
    logging.warning("Bot is ready")
    stream_processing_loop()
РезультатВидео:Извините, данный ресурс не поддреживается. :( ПослесловиеНесмотря на возможности, которые технология умных домофонов несёт жильцам, объединённые в единую сеть сотни (тысячи?) подъездных дверей с камерами и микрофонами (да, в произвольно получаемом видеопотоке есть и аудио!), ведущими непрерывную запись — как по мне, скорее пугающее явление, открывающее новые возможности для нарушения приватности.Я бы предпочёл, чтобы доступ к видеопотоку предоставлялся только в момент звонка в квартиру и ведущаяся трёхдневная запись, позиционирующаяся как средство раскрытия правонарушений, хранилась не на серверах компании, а непосредственно в домофоне, с возможностью доступа к ней по запросу. Или не велась вовсе.
===========
Источник:
habr.com
===========

Похожие новости: Теги для поиска: #_informatsionnaja_bezopasnost (Информационная безопасность), #_obrabotka_izobrazhenij (Обработка изображений), #_domofon (домофон), #_informatsionnaja_bezopasnost (информационная безопасность), #_mitm, #_raspoznavanie_lits (распознавание лиц), #_openvino, #_informatsionnaja_bezopasnost (
Информационная безопасность
)
, #_obrabotka_izobrazhenij (
Обработка изображений
)
Профиль  ЛС 
Показать сообщения:     

Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы

Текущее время: 22-Ноя 08:01
Часовой пояс: UTC + 5