Odjezdova-tabule-MHD/server/lora.py

103 lines
2.7 KiB
Python

import requests
from requests.structures import CaseInsensitiveDict
import json
import threading
from operator import itemgetter
from base64 import b64encode
from datetime import datetime
from random import randint, shuffle
from time import sleep
from departures import Departure
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class LoraController:
def __init__(self, main):
self.main = main
self.devices = []
self.token = ""
def generate_token(self):
url = "https://lora.plzen.eu/api/v2/login"
headers = CaseInsensitiveDict()
headers["Content-Type"] = "application/json"
data = {
"email": self.main.config["lora_api"]["username"],
"password": self.main.config["lora_api"]["password"]
}
data = requests.post(url, verify=False, headers=headers, data=json.dumps(data)).json()
self.token = data["token"]
def new(self, id: int, stop_id: str):
self.devices.append(LoraDevice(self, id, stop_id))
def json(self):
resp = []
for d in self.devices:
resp.append({'id': f"{d.id:0>16x}", 'stop_id': d.stop_id})
return resp
class LoraDevice:
def __init__(self, controller: LoraController, deveui: int, stop_id: str):
self.controller = controller
self.id = deveui
self.stop_id = stop_id
self.message_pool = []
self.thread = None
self.port = 1
self.clear()
def set_stop(self, stop_id: str):
self.clear()
self.stop_id = stop_id
def get_updated_departures(self):
self.send_time()
for d in Departure.get(self.stop_id):
if self.id not in d.updated or d.updated[self.id] > 0:
self.send(d.data)
def clear(self):
self.send(lambda id: "CLEAR")
def send_time(self):
self.send(lambda id: f"TIME|{(datetime.now() - datetime(1970, 1, 1)).total_seconds():.0f}")
def send(self, msg):
self.message_pool.append(msg)
if not self.thread or not self.thread.is_alive():
self.thread = threading.Thread(target=self.thread_sending)
self.thread.start()
def thread_sending(self):
while not self.controller.main.ended:
if len(self.message_pool) == 0:
break
message = self.message_pool.pop(0)(self.id)
if not message:
continue
url = f"https://lora.plzen.eu/api/v2/nodes/{self.id:0>16x}/queue"
headers = CaseInsensitiveDict()
headers["Content-Type"] = "application/json"
headers["Authorization"] = f"Bearer {self.controller.token}"
payload = {
"confirmed": True,
"data": b64encode(message.encode("utf-8")).decode("ascii"),
"fPort": self.port,
"reference": "string"
}
print(f"{self.id:0>16x} > {message}")
requests.post(url, verify=False, headers=headers, data=json.dumps(payload))
sleep(.5)
self.port += 1
if self.port > 3:
self.port = 1
sleep(5)