Bekijk de onderstaande video om te zien hoe je onze site als een web app op je startscherm installeert.
Opmerking: Deze functie vereist momenteel het gebruik van de ingebouwde Safari browser.
import json, os
from pathlib import Path
from ring_doorbell import Ring, Auth
from oauthlib.oauth2 import MissingTokenError
import time
from datetime import datetime, timezone
import urllib.request as urllib
from threading import Thread
######### This Section Needs Edited By You - unique_api_name as well! ###########################
your_username = 'CHANGEME'
your_password = 'CHANGEME'
your_unique_api_name = 'PythonMonitorAPI/0.1.1'
cache_file = Path("/app/Ring/cached_auth_token.json")
videopath = '/app/Ring/Captures/'
poll_interval = 1.0 # interval in seconds that we poll the Ring API for new events
# Domoticz setup
domoticz_api = 'http://192.168.1.240:30000'
domoticz_idx_mapping = {"Front Door": 528, "Back Door": 529}
#################################################################################################
###### DO NOT EDIT THESE IF YOU DO NOT KNOW WHAT THEY ARE FOR ###################################
# .env overrides: 1) create your .env file with the exported VARS below (ie. export RING_USERNAME='MYUSERNAME')
# 2) source them and start this script like: $ source .env && python3 <script_name>.py
# Note: decided to handle this natively rather than use the python-dotenv package
username = os.getenv('RING_USERNAME', your_username)
password = os.getenv('RING_PASSWORD', your_password)
unique_api_name = os.getenv('RING_API_NAME', your_unique_api_name)
def authenticate():
if cache_file.is_file():
auth = Auth(unique_api_name, json.loads(cache_file.read_text()), token_updated)
else:
auth = Auth(unique_api_name, None, token_updated)
try:
auth.fetch_token(username, password)
except MissingTokenError:
auth.fetch_token(username, password, otp_callback())
auth = Ring(auth)
return auth
def authenticate_and_initialize(gadget_type: str = "doorbells"):
myring = authenticate()
myring.update_data()
devices = myring.devices()
if gadget_type is "doorbells":
doorbells = devices['doorbots']
enumerate_doorbells(doorbells)
return myring, doorbells
def ts(filename_format: bool = False, dirname_format: bool = False):
if dirname_format:
return f"{datetime.now().strftime('%Y-%m-%d')}/"
if filename_format:
return datetime.now().strftime('%H-%M-%S')
else:
return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
def construct_local_filename(triggered_device, device_name: str, last_event_type: str, newest_event_id: int):
filename_timestamp = triggered_device.history(limit=1, enforce_limit=True)[0]['created_at'].replace(
tzinfo=timezone.utc).astimezone(tz=None).strftime('%H-%M-%S')
local_filename = f"{videopath}{daily_directory()}{device_name.replace(' ', '')}_{last_event_type}_" \
f"{filename_timestamp}_{str(newest_event_id)[-4:]}.mp4"
return local_filename
def daily_directory():
daily_directory_path = videopath + ts(dirname_format=True)
if os.path.isdir(daily_directory_path):
return ts(dirname_format=True)
else:
os.mkdir(daily_directory_path)
print(f"{ts()}: Created new daily directory at {daily_directory_path}")
return ts(dirname_format=True)
def token_updated(token):
cache_file.write_text(json.dumps(token))
def otp_callback():
auth_code = input("2FA code: ")
return auth_code
def enumerate_doorbells(doorbells):
for doorbell in doorbells:
print(f"{ts()}: Authentication and discovery successful for {doorbell}.")
def notify_domoticz(device_name: str):
idx = domoticz_idx_mapping.get(device_name)
urllib.urlopen(f'{domoticz_api}/json.htm?type=command¶m=switchlight&idx={idx}&switchcmd=On')
print(f"{ts()}: Domoticz notified of event at {device_name}")
def find_newest_recording_id(triggered_device):
last_recorded_id = triggered_device.last_recording_id or None
if len(triggered_device.history(limit=1, enforce_limit=True)) > 0:
newest_history_id = triggered_device.history(limit=1, enforce_limit=True)[0].get('id')
if last_recorded_id and newest_history_id:
newest_item = max(last_recorded_id, newest_history_id)
return newest_item
else:
return None
def check_device_has_active_alerts(device_name: str, doorbells):
try:
triggered_device = next((doorbell for doorbell in doorbells if doorbell.name == device_name), None)
last_event_type = triggered_device.history(limit=1)[0]['kind']
newest_event_id = find_newest_recording_id(triggered_device)
if "on_demand" in last_event_type:
print(f"{ts()}: skipping {device_name} '{last_event_type}' event (id: {newest_event_id})")
return None, None, None
else:
print(f"{ts()}: {device_name} '{last_event_type}' event was triggered (id: {newest_event_id})")
notify_domoticz(device_name)
except:
triggered_device, last_event_type, newest_event_id = None, None, None
return triggered_device, last_event_type, newest_event_id
def download_recording(newest_event_id: int = None, local_filename: str = None, device_name: str = None,
doorbells=None):
triggered_device = next((doorbell for doorbell in doorbells if doorbell.name == device_name), None)
print(f"{ts()}: Attempting to download recording for {device_name} (id: {newest_event_id})...\n")
if newest_event_id and local_filename and triggered_device:
remaining_download_tries = 5
while remaining_download_tries > 0:
try:
if triggered_device.recording_download(newest_event_id, filename=local_filename, override=True,
timeout=120):
print(f"{ts()}: Saved {device_name} video to: {local_filename}")
except:
print(f"{ts()}: Error downloading {device_name} video from on try number "
f"{str(6 - remaining_download_tries)}.")
remaining_download_tries = remaining_download_tries - 1
time.sleep(5.0)
continue
else:
print(f"{ts()}: {device_name} (id: {newest_event_id}) download thread complete.\n")
break
def queue_download(newest_event_id: int = None, local_filename: str = None, device_name: str = None, doorbells=None):
if newest_event_id and local_filename and device_name:
thread = Thread(target=download_recording,
args=(newest_event_id, local_filename, device_name, doorbells))
thread.start()
print(
f"{ts()}: Download thread created (id: {newest_event_id}). Recording of this event is queued for download.")
else:
print(f"{ts()}: Unexpected error in queue_download()\n")
def queue_download_to_local_file(newest_event_id, triggered_device, last_event_type: str, device_name: str, doorbells):
local_filename = construct_local_filename(triggered_device, device_name, last_event_type, newest_event_id)
print(f"{ts()}: Spawning download thread for {device_name} (id: {newest_event_id})...")
queue_download(newest_event_id, local_filename, device_name, doorbells)
def main():
myring, doorbells = authenticate_and_initialize(gadget_type="doorbells")
if myring and doorbells:
print(f"{ts()}: Entering observation loop - polling interval set to {poll_interval}s\n")
while True:
myring.update_dings()
alerts = myring.active_alerts()
if alerts:
for alert in alerts:
try:
device_name = alert['doorbot_description']
triggered_device, last_event_type, newest_event_id = check_device_has_active_alerts(device_name,
doorbells)
if triggered_device and last_event_type and newest_event_id:
queue_download_to_local_file(newest_event_id, triggered_device, last_event_type, device_name,
doorbells)
except:
time.sleep(poll_interval)
continue
time.sleep(poll_interval)
if __name__ == "__main__":
main()
Echt waar? Oke, mocht je toch een stuk vertaald willen hebben laat je het maar weten.Vertalen hoeft niet hoor.