[Python, Разработка на Raspberry Pi, Умный дом, DIY или Сделай сам] «Умная камера» на базе Raspberry Pi с управлением через Telegram-бота
Автор
Сообщение
news_bot ®
Стаж: 6 лет 9 месяцев
Сообщений: 27286
Привет, меня зовут Иван. Сразу отвечу на главный вопрос: почему стал собирать сам, а не взял готовое решение? Во-первых, стоимость готовых решений - Raspberry Pi со всеми датчиками и камерой вышла не больше $30, большая часть еще по курсу 60 рублей за доллар. Во-вторых, почти все части уже были - Raspberry Pi отдал брат, камера осталась еще с лохматых времен, диод тоже был - покупал для Arduino, а датчик движения на Aliexpress стоил не больше 100 рублей.Повествование в статье будет построено следующим образом:
- Определим что нам потребуется;
- Поработаем с диодом;
- С датчиком движения;
- С камерой (фото);
- С камерой (видео);
- Разберем работу с Telegram-ботом (рассмотрим скелет);
- Создадим "Умную камеру";
- Посмотрим на работу "Умной камеры";
- Определим узкие места и возможные решения.
Итак, поехалиЧто нам потребуется
На Raspberry должен быть настроен интернет, установлены:
- ffmpeg - для записи видео с камеры;
- Python 3.7.
У Python должны быть следующие библиотеки:
- RPi.GPIO;
- pygame;
- telebot.
И понадобятся провода - F2F, что бы все это соединить.Работа с диодом
import RPi.GPIO as GPIO
import time
LED_PIN = 3
def setup():
GPIO.setmode(GPIO.BOARD)
GPIO.setwarnings(False)
GPIO.setup(LED_PIN, GPIO.OUT)
def destroy():
GPIO.output(LED_PIN, GPIO.LOW)
GPIO.cleanup()
def blink():
GPIO.output(LED_PIN, GPIO.HIGH)
time.sleep(1)
GPIO.output(LED_PIN, GPIO.LOW)
def main():
setup()
blink()
destroy()
if __name__ == '__main__':
main()
Импортируем библиотеку для работы с нодами Raspberry, определяем нод для диода. В методе настройки библиотеки указываем, что нумерация нодов будет в соответствии с номерами на самой Raspberry, отключаем все предупреждения библиотеки и сообщаем, что на нод диода ток будет выходить, а не считываться. В методе очистки ресурсов отключаем диод и очищаем библиотеку. В главном методе - blink подаем сигнал на нод диода, ждем 1 секунду и выключаем подачу сигнала на нод диода. В методе main вызываем все методы подряд и больше ничего не делаем. Светом диода будем отображать обнаруженение движения датчиком.Работа с датчиком движенияПеред подключением датчика движения к Raspberry необходимо для начала определить какой провод за что отвечает, потому что на некоторых датчиках земля находится слева, на некоторых - справа и это будет кардинально влиять на работу датчика:
После этого можно подключать датчик и настраивать его физически - скорректировать значение delay и sensitivity для своих нужд, вот пример моей настройки:
Помимо этого, есть еще один параметр настройки у датчика движения, хоть он и не бросается в глаза. На обратной стороне датчика есть "ключ", которым можно настроить дальность работы датчика, для своих нужд я переключил его:
В принципе, работа с датчиком движения сводится к одному методу - методу чтения сигнала с датчика (16 строка).
import RPi.GPIO as GPIO
import time
PIR_PIN = 11
def setup():
GPIO.setmode(GPIO.BOARD)
GPIO.setwarnings(False)
GPIO.setup(PIR_PIN, GPIO.IN)
def destroy():
GPIO.cleanup()
def sensorJob():
while True:
i = GPIO.input(PIR_PIN)
print("PIR value: " + str(i))
time.sleep(0.1)
def main():
setup()
sensorJob()
destroy()
if __name__ == '__main__':
main()
Определяем нод для датчика движения. В методе настройки библиотеки указываем, что по указанном ноду будет читаться сигнал. В главном методе - sensorJob в бесконечном цикле считываем сигнал, выводим значение в консоль и ожидаем небольшой промежуток времени.Работа с камерой (фото)С "железом" разобрались и теперь можно переходить к съемке фото и видео. Подключаем камеру к любому USB-порту, по необходимости, настраиваем. Вероятнее всего, операционная система сама подцепит нужные драйвера и дополнительной настройки не потребуется. Если у вас подключена 1 камера она будет называться /dev/video0. Также важно определить разрешение камеры, потому что можно получить фотографии сплошным черным цветом.
from datetime import datetime
import pygame
import pygame.camera
pygame.init()
pygame.camera.init()
pygame.camera.list_cameras()
cam = pygame.camera.Camera("/dev/video0", (640,426))
def saveCapture():
filename = datetime.now().strftime('%d-%m-%Y %H:%M:%S') + '.jpg'
cam.start()
pygame.image.save(cam.get_image(), filename)
cam.stop()
def main():
setup()
saveCapture()
destroy()
if __name__ == '__main__':
main()
Импортируем библиотеку для работы с камерой. Инициализируем библиотеку, инициализируем расширение для работы с камерой, выводим список доступных камер, сохраняем нужную нам камеру в переменную, во втором параметре указываем разрешение камеры. В методе saveCapture определяем название файла, вызываем метод start у камеры, через метод get_image() у камеры получаем объект со снимком, сохраняем его методом pygame.image.save, в завершение останавливаем работу камеры методом stop. Очень важно останавливать работу камеры через библиотеку pygame, потому что библиотека ffmpeg будет пытаться использовать камеру и в случае, если мы ее не освободим будет завершаться ошибкой.Работа с камерой (видео)Для съемки видео мы будем отправлять в терминал сообщение следующего вида:
ffmpeg -f v4l2 -framerate 25 -video_size 640x426 -i /dev/video0 -t 5 -c copy <filename>
Где будем указывать формат видео - v4l2, количетсво кадров, разрешение съемки, главный аргумент -c copy, если у вас медленная камера, с этим параметром съемка и сохранение файла будет происходить в десятки раз быстрее, но будет отсутствовать preview у видео и в Telegram не будет отображаться количество времени, которое длится видео. Также, нельзя будет просмотреть видео прямо из Telegram, потребуется отдельный плеер.Каркас Telegram-ботаПеред рассмотрением работы с Telegram-ботом предполагаем, что вы уже получили ключ от BotFather и выполнили настройку бота на стороне Telegram, там нет ничего сложного, поэтому не будем на этом останавливаться. Также, мы будем работать сразу с InlineKeyboard у Telegram-бота, потому что это удобно и не нужно переживать о том, как правильно писать ту или иную команду. Кроме того, в самом начале будем делать проверку на ID пользователя, чтобы никто случайно (или специально) не подглядывал в нашу камеру.
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
import telebot
TOKEN = '99999999999:xxxxxxxxxxxxxxxxxx'
ADMIN_USER_ID = 999999999
bot = telebot.TeleBot(TOKEN)
@bot.message_handler(commands=['start'])
def start(message):
telegram_user = message.from_user
if telegram_user.id != ADMIN_USER_ID:
bot.send_message(message.chat.id, text="Hello. My name is James Brown. What do i do for you?")
return
keyboard = [
[
InlineKeyboardButton("Send capture", callback_data='sendCapture'),
InlineKeyboardButton("Send video", callback_data='sendVideo')
],
]
bot.send_message(chat_id=message.chat.id,
text="Supported commands:",
reply_markup=InlineKeyboardMarkup(keyboard))
def sendCapture(chat_id):
bot.send_photo(chat_id, photo=open('<filename>', 'rb'))
def sendVideo(chat_id):
bot.send_video(chat_id, open('<filename>', 'rb'))
@bot.callback_query_handler(func=lambda call: True)
def button(call):
globals()[call.data](call.message.chat.id)
def main():
setup()
bot.polling(none_stop=False, interval=5, timeout=20)
destroy()
if __name__ == '__main__':
main()
Импортируем классы для InlineKeyboard, импортируем библиотеку Telegram-бота. В переменной TOKEN указываем ключ от BotFather. Сохраняем ADMIN ID. С помощью нотаций указываем команду для метода start. В методе start делаем проверку на USER ID и если это не мы, то выводим сообщение и выходим из метода, если же это мы, то отправляем InlineKeyboard с доступными командами. При отправке клавиатуры можно также сразу отправлять и текст. В методах sendCapture и sendVideo показаны примеры команд для отправки фото и видео через Telegram-бота. Далее нам нужно указать метод, который будет обрабатывать нажатия на InlineKeyboard, тоже через нотацию, но теперь нотация немного другая, вызов метода по нажатию на кнопку делаем по имени этого метода. В методе main вызываем bot.pooling, который в бесконечном цикле проверяет наличие входящих сообщений боту. Если у вас быстрая Raspberry и интернет, то можете не указывать параметры у этого метода и оставить их по-умолчанию. У меня стабильность отправки сообщений была низкой из-за чего пришлось добавить interval = 5. В конце статьи расскажу на что это влияет."Умная камера"Подключаем диод и датчик движения к нужным GPIO нодам, камеру к USB.Нумерация нодов
У меня это выглядит вот так:
Теперь соберем все куски логики, описанные ранее и получим "Умную камеру" с управлением через Telegram-бота.
from datetime import datetime
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
import telebot
import logging
import pygame
import pygame.camera
import RPi.GPIO as GPIO
import threading
import time
import os
TOKEN = '99999999999:xxxxxxxxxxxxxxxxxxxxxx'
ADMIN_USER_ID = 999999999999
LED_PIN = 3
PIR_PIN = 11
VIDEO_FILE_FORMAT = '.mkv'
# Enable Logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
isSensorEnabled = False
isMuteNotifications = False
last_chat_id = -1
keyboard = []
pygame.init()
pygame.camera.init()
pygame.camera.list_cameras()
cam = pygame.camera.Camera("/dev/video0", (640,426))
bot = telebot.TeleBot(TOKEN)
def setup():
GPIO.setmode(GPIO.BOARD)
GPIO.setwarnings(False)
GPIO.setup(LED_PIN, GPIO.OUT)
GPIO.setup(PIR_PIN, GPIO.IN)
def destroy():
GPIO.output(LED_PIN, GPIO.LOW)
GPIO.cleanup()
def log_params(method_name, message):
logger.debug("Method: %s\nFrom: %s\nchat_id: %d\nText: %s" %
(method_name,
message.from_user,
message.chat.id,
message.text))
@bot.message_handler(commands=['start'])
def start(message):
global keyboard
log_params('start', message)
telegram_user = message.from_user
if telegram_user.id != ADMIN_USER_ID:
bot.send_message(message.chat.id, text="Hello. My name is James Brown. What do i do for you?")
return
keyboard = [
[InlineKeyboardButton("Start sensor", callback_data='start_sensor')],
[
InlineKeyboardButton("Get capture", callback_data='get_capture'),
InlineKeyboardButton("Get video", callback_data='get_video')
],
]
bot.send_message(chat_id=message.chat.id,
text="Supported commands:",
reply_markup=InlineKeyboardMarkup(keyboard))
def sendCapture(chat_id):
filename = datetime.now().strftime('%d-%m-%Y %H:%M:%S') + '.jpg'
if (os.path.exists(filename)):
bot.send_photo(chat_id, photo=open(filename, 'rb'))
else:
cam.start()
pygame.image.save(cam.get_image(), filename)
cam.stop()
bot.send_photo(chat_id, photo=open(filename, 'rb'))
def get_capture(chat_id):
sendCapture(chat_id)
bot.send_message(chat_id=chat_id,
text="Supported commands:",
reply_markup=InlineKeyboardMarkup(keyboard))
def sendVideo(chat_id):
filename = sorted(list(filter(lambda x: x.endswith(VIDEO_FILE_FORMAT), os.listdir())))[-1]
bot.send_video(chat_id, open(filename, 'rb'))
def captureVideo():
filename = datetime.now().strftime('%d-%m-%Y %H:%M:%S') + VIDEO_FILE_FORMAT
os.system("ffmpeg -f v4l2 -framerate 25 -video_size 640x426 -i /dev/video0 -t 5 -c copy "" + filename + """)
return filename
def get_video(chat_id):
bot.send_message(chat_id=chat_id,
text="Capturing video..")
filename = captureVideo()
bot.send_message(chat_id=chat_id,
text="Sending video..")
bot.send_video(chat_id, open(filename, 'rb'))
bot.send_message(chat_id=chat_id,
text="Supported commands:",
reply_markup=InlineKeyboardMarkup(keyboard))
def sensorJob():
global isSensorEnabled
global keyboard
isRecording = False
while isSensorEnabled:
i = GPIO.input(PIR_PIN)
GPIO.output(LED_PIN, i)
if (i == 1 and not isRecording):
isRecording = True
if (not isMuteNotifications):
sendCapture(last_chat_id)
if (isRecording):
captureVideo()
if (i == 0 and isRecording):
if (not isMuteNotifications):
sendVideo(last_chat_id)
isRecording = False
time.sleep(0.1)
if (isRecording):
sendVideo(last_chat_id)
keyboard = [
[InlineKeyboardButton("Start sensor", callback_data='start_sensor')],
[
InlineKeyboardButton("Get capture", callback_data='get_capture'),
InlineKeyboardButton("Get video", callback_data='get_video')
],
]
bot.send_message(chat_id=last_chat_id,
text="Sensor stopped")
bot.send_message(chat_id=last_chat_id,
text="Supported commands:",
reply_markup=InlineKeyboardMarkup(keyboard))
def start_sensor(chat_id):
global keyboard
global isSensorEnabled
global last_chat_id
last_chat_id = chat_id
isSensorEnabled = True
threading.Thread(target=sensorJob).start()
keyboard = [
[
InlineKeyboardButton("Stop sensor", callback_data='stop_sensor'),
InlineKeyboardButton("Mute notifications", callback_data='mute_notifications')
]
]
bot.send_message(chat_id=chat_id,
text="Sensor started")
bot.send_message(chat_id=chat_id,
text="Supported commands:",
reply_markup=InlineKeyboardMarkup(keyboard))
def stop_sensor(chat_id):
global keyboard
global last_chat_id
last_chat_id = -1
isSensorEnabled = False
GPIO.output(LED_PIN, GPIO.LOW)
keyboard = [
[InlineKeyboardButton("Start sensor", callback_data='start_sensor')],
[
InlineKeyboardButton("Get capture", callback_data='get_capture'),
InlineKeyboardButton("Get video", callback_data='get_video')
],
]
bot.send_message(chat_id=chat_id,
text="Sensor stop requested")
def mute_notifications(chat_id):
global keyboard
isMuteNotifications = True
keyboard = [
[
InlineKeyboardButton("Stop sensor", callback_data='stop_sensor'),
InlineKeyboardButton("Unmute notifications", callback_data='unmute_notifications')
]
]
bot.send_message(chat_id=chat_id,
text="Notifications muted")
bot.send_message(chat_id=chat_id,
text="Supported commands:",
reply_markup=InlineKeyboardMarkup(keyboard))
def unmute_notifications(chat_id):
global keyboard
isMuteNotifications = False
keyboard = [
[
InlineKeyboardButton("Stop sensor", callback_data='stop_sensor'),
InlineKeyboardButton("Mute notifications", callback_data='mute_notifications')
]
]
bot.send_message(chat_id=chat_id,
text="Notifications unmuted")
bot.send_message(chat_id=chat_id,
text="Supported commands:",
reply_markup=InlineKeyboardMarkup(keyboard))
@bot.callback_query_handler(func=lambda call: True)
def button(call):
globals()[call.data](call.message.chat.id)
def main():
setup()
bot.polling(none_stop=False, interval=5, timeout=20)
destroy()
if __name__ == '__main__':
main()
Работу сенсора будем запускать в отдельном потоке, поэтому импортируем необходимую библиотеку. Создаем переменную для хранения расширения видео. Логирование из библиотеки logging не влияет на работу "Умной камеры", поэтому не будем заострять на этом внимание. Создаем переменные для флагов о работе сенсора и необходимости отправки уведомлений. Создаем переменную для хранения последнего chat_id, зачем это - расскажу позже. Переменная для хранения клавиатуры нужна, чтобы можно было повторно отправлять нужную клавиатуру. Дальше идет знакомый нам код по инициализации библиотеки pygame и Telegram-бота. Методы setup и destroy нам уже знакомы, метод логирования можно проигнорировать. В методе start проверяем user_id, если пользователь - мы, то отправляем актуальную клавиатуру. В методе sendCapture определяем filename, создаем изображение с камеры и отправляем по chat_id. Метод get_capture нужен для отправки изображения и клавиатуры с актуальными коммандами. Метод sendVideo отвечает за отправку последнего записанного видео. В методе captureVideo определяем filename и в систему отправляем запрос на запись видео с помощью утилиты ffmpeg. Метод get_video служит для отправки видео с отправкой промежуточных статусов. Дальше рассмотрим ключевой метод sensorJob: здесь в бесконечном цикле считываем сигнал с датчика движения, выводим его на диод и при наличии сигнала сразу отправляем фото и включаем запись видео. Когда работа цикла завершается - отправляем сообщение об отключении датчика движения и актуальную клавиатуру. Так как метод sensorJob работает из потока - здесь нам и понадобится переменная last_chat_id. В методе start_sensor запоминаем сhat_id, устанавливаем флаг работы датчика движения, запускаем поток с методом sensorJob и отправляем статус и актуальную клавиатуру. В методе stop_sensor чистим last_chat_id, устанавливаем флаг работы датчика движения в ложное состояние, отключаем диод и отправляем актуальную клавиатуру. В методах mute/unmute_notifications переключаем в соответствующее положение флаг и отправляем статус и актуальную клавиатуру. На этом логика "Умной камеры" на текущий момент заканчивается. Теперь посмотрим на "Умную камеру" в действии.Работа "Умной камеры"Извините, данный ресурс не поддреживается. :( Обратите внимание на время начала записи и получения фото и видео - оно очень большое для реальных кейсов, больше 5 минут. Это связано с параметрами в bot.pooling в методе main, а именно с параметром interval. Если оставить его по-умолчанию, то задержка будет минимальна и, как минимум, фото будет приходить практически моментально, но тогда возможны сбои из-за "Connection timeout" в работе Telegram-клиента и надо как-то обрабатывать это дополнительно.Узкие места и возможные решения
- Необходима быстрая флешка, чтобы успевать сохранять видео;
- Желательно запись видео инициировать из Python напрямую, например, через библиотеку pyffmpeg, чтобы уменьшить время на инициализацию библиотеки при каждой записи видео;
- Хорошая камера, чтобы можно было рязглядеть нарушителя;
- Наличие быстрого интернета для более быстрого получения актуального видео.
===========
Источник:
habr.com
===========
Похожие новости:
- [Open source, Разработка игр, Графический дизайн, Дизайн игр, DIY или Сделай сам] О ходе создания русской народной игры «Колобок» в феврале
- [Законодательство в IT, Социальные сети и сообщества, IT-компании] Роскомнадзор отправил в суд протоколы на Facebook, Instagram, Twitter, TikTok, «ВКонтакте», Telegram и YouTube
- [Python, Разработка игр, Тестирование игр] Как убедить гейм-дизайнера запустить тесты?
- [Python, Программирование, Машинное обучение] Поиск нарушений на видео с помощью компьютерного зрения
- [Разработка веб-сайтов, Python, Django, Функциональное программирование] Делаем тесты частью приложения (перевод)
- [Компьютерное железо, Старое железо, Настольные компьютеры, DIY или Сделай сам] Блогер собрал iMac на M1 из старого iMac и Mac mini
- [Программирование, Scala] Scala 3 / Dotty – Факты и Мнения. Что мы ожидаем? (перевод)
- [Мессенджеры, Фриланс] Фиксирование договоренностей прямо в чате экономя нервы, время и деньги
- [Python, Django, Машинное обучение, Конференции] «Хитрый питон» Михаил Корнеев, Григорий Петров, Илья Беда и другие классные спикеры-тезисы выступлений на PyCon Weekend
- [3D-принтеры, DIY или Сделай сам] Пепельница-терминатор на 3D-принтере: проект от идеи до воплощения
Теги для поиска: #_python, #_razrabotka_na_raspberry_pi (Разработка на Raspberry Pi), #_umnyj_dom (Умный дом), #_diy_ili_sdelaj_sam (DIY или Сделай сам), #_raspberrypi, #_python, #_umnyj_dom (умный дом), #_telegram, #_telegrambot, #_python, #_razrabotka_na_raspberry_pi (
Разработка на Raspberry Pi
), #_umnyj_dom (
Умный дом
), #_diy_ili_sdelaj_sam (
DIY или Сделай сам
)
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 12:53
Часовой пояс: UTC + 5
Автор | Сообщение |
---|---|
news_bot ®
Стаж: 6 лет 9 месяцев |
|
Привет, меня зовут Иван. Сразу отвечу на главный вопрос: почему стал собирать сам, а не взял готовое решение? Во-первых, стоимость готовых решений - Raspberry Pi со всеми датчиками и камерой вышла не больше $30, большая часть еще по курсу 60 рублей за доллар. Во-вторых, почти все части уже были - Raspberry Pi отдал брат, камера осталась еще с лохматых времен, диод тоже был - покупал для Arduino, а датчик движения на Aliexpress стоил не больше 100 рублей.Повествование в статье будет построено следующим образом:
На Raspberry должен быть настроен интернет, установлены:
И понадобятся провода - F2F, что бы все это соединить.Работа с диодом import RPi.GPIO as GPIO
import time LED_PIN = 3 def setup(): GPIO.setmode(GPIO.BOARD) GPIO.setwarnings(False) GPIO.setup(LED_PIN, GPIO.OUT) def destroy(): GPIO.output(LED_PIN, GPIO.LOW) GPIO.cleanup() def blink(): GPIO.output(LED_PIN, GPIO.HIGH) time.sleep(1) GPIO.output(LED_PIN, GPIO.LOW) def main(): setup() blink() destroy() if __name__ == '__main__': main() После этого можно подключать датчик и настраивать его физически - скорректировать значение delay и sensitivity для своих нужд, вот пример моей настройки: Помимо этого, есть еще один параметр настройки у датчика движения, хоть он и не бросается в глаза. На обратной стороне датчика есть "ключ", которым можно настроить дальность работы датчика, для своих нужд я переключил его: В принципе, работа с датчиком движения сводится к одному методу - методу чтения сигнала с датчика (16 строка). import RPi.GPIO as GPIO
import time PIR_PIN = 11 def setup(): GPIO.setmode(GPIO.BOARD) GPIO.setwarnings(False) GPIO.setup(PIR_PIN, GPIO.IN) def destroy(): GPIO.cleanup() def sensorJob(): while True: i = GPIO.input(PIR_PIN) print("PIR value: " + str(i)) time.sleep(0.1) def main(): setup() sensorJob() destroy() if __name__ == '__main__': main() from datetime import datetime
import pygame import pygame.camera pygame.init() pygame.camera.init() pygame.camera.list_cameras() cam = pygame.camera.Camera("/dev/video0", (640,426)) def saveCapture(): filename = datetime.now().strftime('%d-%m-%Y %H:%M:%S') + '.jpg' cam.start() pygame.image.save(cam.get_image(), filename) cam.stop() def main(): setup() saveCapture() destroy() if __name__ == '__main__': main() ffmpeg -f v4l2 -framerate 25 -video_size 640x426 -i /dev/video0 -t 5 -c copy <filename>
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
import telebot TOKEN = '99999999999:xxxxxxxxxxxxxxxxxx' ADMIN_USER_ID = 999999999 bot = telebot.TeleBot(TOKEN) @bot.message_handler(commands=['start']) def start(message): telegram_user = message.from_user if telegram_user.id != ADMIN_USER_ID: bot.send_message(message.chat.id, text="Hello. My name is James Brown. What do i do for you?") return keyboard = [ [ InlineKeyboardButton("Send capture", callback_data='sendCapture'), InlineKeyboardButton("Send video", callback_data='sendVideo') ], ] bot.send_message(chat_id=message.chat.id, text="Supported commands:", reply_markup=InlineKeyboardMarkup(keyboard)) def sendCapture(chat_id): bot.send_photo(chat_id, photo=open('<filename>', 'rb')) def sendVideo(chat_id): bot.send_video(chat_id, open('<filename>', 'rb')) @bot.callback_query_handler(func=lambda call: True) def button(call): globals()[call.data](call.message.chat.id) def main(): setup() bot.polling(none_stop=False, interval=5, timeout=20) destroy() if __name__ == '__main__': main() У меня это выглядит вот так: Теперь соберем все куски логики, описанные ранее и получим "Умную камеру" с управлением через Telegram-бота. from datetime import datetime
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton import telebot import logging import pygame import pygame.camera import RPi.GPIO as GPIO import threading import time import os TOKEN = '99999999999:xxxxxxxxxxxxxxxxxxxxxx' ADMIN_USER_ID = 999999999999 LED_PIN = 3 PIR_PIN = 11 VIDEO_FILE_FORMAT = '.mkv' # Enable Logging logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) isSensorEnabled = False isMuteNotifications = False last_chat_id = -1 keyboard = [] pygame.init() pygame.camera.init() pygame.camera.list_cameras() cam = pygame.camera.Camera("/dev/video0", (640,426)) bot = telebot.TeleBot(TOKEN) def setup(): GPIO.setmode(GPIO.BOARD) GPIO.setwarnings(False) GPIO.setup(LED_PIN, GPIO.OUT) GPIO.setup(PIR_PIN, GPIO.IN) def destroy(): GPIO.output(LED_PIN, GPIO.LOW) GPIO.cleanup() def log_params(method_name, message): logger.debug("Method: %s\nFrom: %s\nchat_id: %d\nText: %s" % (method_name, message.from_user, message.chat.id, message.text)) @bot.message_handler(commands=['start']) def start(message): global keyboard log_params('start', message) telegram_user = message.from_user if telegram_user.id != ADMIN_USER_ID: bot.send_message(message.chat.id, text="Hello. My name is James Brown. What do i do for you?") return keyboard = [ [InlineKeyboardButton("Start sensor", callback_data='start_sensor')], [ InlineKeyboardButton("Get capture", callback_data='get_capture'), InlineKeyboardButton("Get video", callback_data='get_video') ], ] bot.send_message(chat_id=message.chat.id, text="Supported commands:", reply_markup=InlineKeyboardMarkup(keyboard)) def sendCapture(chat_id): filename = datetime.now().strftime('%d-%m-%Y %H:%M:%S') + '.jpg' if (os.path.exists(filename)): bot.send_photo(chat_id, photo=open(filename, 'rb')) else: cam.start() pygame.image.save(cam.get_image(), filename) cam.stop() bot.send_photo(chat_id, photo=open(filename, 'rb')) def get_capture(chat_id): sendCapture(chat_id) bot.send_message(chat_id=chat_id, text="Supported commands:", reply_markup=InlineKeyboardMarkup(keyboard)) def sendVideo(chat_id): filename = sorted(list(filter(lambda x: x.endswith(VIDEO_FILE_FORMAT), os.listdir())))[-1] bot.send_video(chat_id, open(filename, 'rb')) def captureVideo(): filename = datetime.now().strftime('%d-%m-%Y %H:%M:%S') + VIDEO_FILE_FORMAT os.system("ffmpeg -f v4l2 -framerate 25 -video_size 640x426 -i /dev/video0 -t 5 -c copy "" + filename + """) return filename def get_video(chat_id): bot.send_message(chat_id=chat_id, text="Capturing video..") filename = captureVideo() bot.send_message(chat_id=chat_id, text="Sending video..") bot.send_video(chat_id, open(filename, 'rb')) bot.send_message(chat_id=chat_id, text="Supported commands:", reply_markup=InlineKeyboardMarkup(keyboard)) def sensorJob(): global isSensorEnabled global keyboard isRecording = False while isSensorEnabled: i = GPIO.input(PIR_PIN) GPIO.output(LED_PIN, i) if (i == 1 and not isRecording): isRecording = True if (not isMuteNotifications): sendCapture(last_chat_id) if (isRecording): captureVideo() if (i == 0 and isRecording): if (not isMuteNotifications): sendVideo(last_chat_id) isRecording = False time.sleep(0.1) if (isRecording): sendVideo(last_chat_id) keyboard = [ [InlineKeyboardButton("Start sensor", callback_data='start_sensor')], [ InlineKeyboardButton("Get capture", callback_data='get_capture'), InlineKeyboardButton("Get video", callback_data='get_video') ], ] bot.send_message(chat_id=last_chat_id, text="Sensor stopped") bot.send_message(chat_id=last_chat_id, text="Supported commands:", reply_markup=InlineKeyboardMarkup(keyboard)) def start_sensor(chat_id): global keyboard global isSensorEnabled global last_chat_id last_chat_id = chat_id isSensorEnabled = True threading.Thread(target=sensorJob).start() keyboard = [ [ InlineKeyboardButton("Stop sensor", callback_data='stop_sensor'), InlineKeyboardButton("Mute notifications", callback_data='mute_notifications') ] ] bot.send_message(chat_id=chat_id, text="Sensor started") bot.send_message(chat_id=chat_id, text="Supported commands:", reply_markup=InlineKeyboardMarkup(keyboard)) def stop_sensor(chat_id): global keyboard global last_chat_id last_chat_id = -1 isSensorEnabled = False GPIO.output(LED_PIN, GPIO.LOW) keyboard = [ [InlineKeyboardButton("Start sensor", callback_data='start_sensor')], [ InlineKeyboardButton("Get capture", callback_data='get_capture'), InlineKeyboardButton("Get video", callback_data='get_video') ], ] bot.send_message(chat_id=chat_id, text="Sensor stop requested") def mute_notifications(chat_id): global keyboard isMuteNotifications = True keyboard = [ [ InlineKeyboardButton("Stop sensor", callback_data='stop_sensor'), InlineKeyboardButton("Unmute notifications", callback_data='unmute_notifications') ] ] bot.send_message(chat_id=chat_id, text="Notifications muted") bot.send_message(chat_id=chat_id, text="Supported commands:", reply_markup=InlineKeyboardMarkup(keyboard)) def unmute_notifications(chat_id): global keyboard isMuteNotifications = False keyboard = [ [ InlineKeyboardButton("Stop sensor", callback_data='stop_sensor'), InlineKeyboardButton("Mute notifications", callback_data='mute_notifications') ] ] bot.send_message(chat_id=chat_id, text="Notifications unmuted") bot.send_message(chat_id=chat_id, text="Supported commands:", reply_markup=InlineKeyboardMarkup(keyboard)) @bot.callback_query_handler(func=lambda call: True) def button(call): globals()[call.data](call.message.chat.id) def main(): setup() bot.polling(none_stop=False, interval=5, timeout=20) destroy() if __name__ == '__main__': main()
=========== Источник: habr.com =========== Похожие новости:
Разработка на Raspberry Pi ), #_umnyj_dom ( Умный дом ), #_diy_ili_sdelaj_sam ( DIY или Сделай сам ) |
|
Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете голосовать в опросах
Вы не можете прикреплять файлы к сообщениям
Вы не можете скачивать файлы
Текущее время: 22-Ноя 12:53
Часовой пояс: UTC + 5