Odjezdova-tabule-MHD/server/lora.py
2022-12-19 13:21:03 +01:00

92 lines
2.4 KiB
Python

import requests
from requests.structures import CaseInsensitiveDict
import json
import threading
from operator import itemgetter
from base64 import b64encode
from datetime import datetime
from random import randint, shuffle
from time import sleep
from departures import Departure
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class LoraController:
def __init__(self, main):
self.main = main
self.storage = dict()
self.devices = []
self.token = ""
def generate_token(self):
url = "https://lora.plzen.eu/api/v2/login"
headers = CaseInsensitiveDict()
headers["Content-Type"] = "application/json"
data = {
"email": self.main.config["lora_api"]["username"],
"password": self.main.config["lora_api"]["password"]
}
data = requests.post(url, verify=False, headers=headers, data=json.dumps(data)).json()
self.token = data["token"]
def new(self, id: int, stop_id: str):
self.devices.append(LoraDevice(self, id, stop_id))
class LoraDevice:
def __init__(self, controller: LoraController, deveui: int, stop_id: str):
self.controller = controller
self.id = deveui
self.stop_id = stop_id
self.message_pool = []
self.thread = None
self.port = 1
self.send(lambda: "CLEAR")
def get_updated_departures(self):
self.send_time()
for d in Departure.get(self.stop_id):
if d.updated > 0:
self.send(d.data)
def send_time(self):
self.send(lambda: f"TIME|{(datetime.now() - datetime(1970, 1, 1)).total_seconds():.0f}")
def send(self, msg):
self.message_pool.append(msg)
if not self.thread or not self.thread.is_alive():
self.thread = threading.Thread(target=self.thread_sending)
self.thread.start()
def thread_sending(self):
while not self.controller.main.ended:
if len(self.message_pool) == 0:
break
message = self.message_pool.pop(0)()
if not message:
continue
url = f"https://lora.plzen.eu/api/v2/nodes/{self.id:0>16x}/queue"
headers = CaseInsensitiveDict()
headers["Content-Type"] = "application/json"
headers["Authorization"] = f"Bearer {self.controller.token}"
payload = {
"confirmed": True,
"data": b64encode(message.encode("utf-8")).decode("ascii"),
"fPort": self.port,
"reference": "string"
}
print(f"{self.id:0>16x} > {message}")
requests.post(url, verify=False, headers=headers, data=json.dumps(payload))
sleep(.5)
self.port += 1
if self.port > 3:
self.port = 1
sleep(5)