Energyclient [portable] ❲TRUSTED❳
def control_load(self, state: bool): """Turn load on/off (demand response)""" resp = self.session.post( f"self.api_root/meters/self.meter_id/control", json="load_control": state ) return resp.status_code == 200
def __post_init__(self): self._init_db() self.session = requests.Session() self.session.headers.update("Authorization": f"Bearer self.token") energyclient
def fetch_latest(self): resp = self.session.get(f"self.api_root/meters/self.meter_id/reading") resp.raise_for_status() data = resp.json() # Cache it self.conn.execute( "INSERT OR REPLACE INTO readings (ts, power_w, cumulative_wh) VALUES (?, ?, ?)", (datetime.now(timezone.utc).isoformat(), data["power_w"], data["cumulative_wh"]) ) self.conn.commit() return data cumulative_wh) VALUES (?