My cold home

Apache Spark and Homeassistant

Posted by Gregor Tätzner


My smart home network

A couple of years now I’m running a small home assistant installation at home, but never really tried to analyze the data. Mainly the network consists of a couple of motion sensors, smart lights and switches that provide quite a lot of data points each day.

Export the data from home assistant

I decided to export all history data to my private cloud storage, for easy access and long term storage, with the help of a small python script. The AppDaemon integration allows you to run scripts on your home assistant:

import appdaemon.plugins.hass.hassapi as hass
import requests
from requests.auth import HTTPBasicAuth
from datetime import datetime, timezone, time

username = "nextcloudusername"
app_pw = "nextcloudpw"

class LogHistory(hass.Hass):

  def initialize(self):
    # Create a time object for 1am
    run_time = time(1, 0, 0)
    # Schedule a daily callback that will call run_daily() every night
    self.run_daily(self.send_log, run_time)

  def send_log(self, cb_args):
     headers = {'Authorization': 'Bearer homeassistanttoken'}
     r = requests.get("http://homeassistant.local:8123/api/history/period", headers=headers)
     if r.status_code == requests.codes.ok:
        now_utc = datetime.now(timezone.utc)
        log_time = now_utc.isoformat()
        filename = f"ha_history_{log_time}.json"
        url = f'<nextclouddomain>/remote.php/dav/files/{username}/<path>/{filename}'
        put_r = requests.put(url, data=r.content, auth = HTTPBasicAuth(username, app_pw))
        if put_r.status_code == requests.codes.ok:
            self.log("History log uploaded")

The script collects each night the history data for the last day from the home assistant api and pushes the json via webdav to a folder in my nextcloud, neatly labeled with the current timestamp.

Let it spark

I decided to use Google Colab as a spark runtime, since they provide free computation and nice notebooks. But you could also run it on your own machine for improved data security.

First install spark with all requirements:

!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!test -f spark-3.5.0-bin-hadoop3.tgz || wget https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

import findspark
findspark.init()
findspark.find()

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
       .builder \
       .appName("Our First Spark example") \
       .getOrCreate()

spark

Now we are ready! Lets fetch the exported data again with webdav:

LOG_DATA_DIR = "log_data"
USERNAME = "nextclouduser"
APP_PW = "nextcloudpw"

import json
import requests
import os
import urllib.parse
from requests.auth import HTTPBasicAuth
from xml.dom import minidom
import pyspark.pandas as ps
from pyspark.sql.functions import min, max, avg, when
import matplotlib.pyplot as plt

def write_log(file_name, data):
  data_array = []
  for l in data:
    for s in l:
      data_array.append(s)

  os.makedirs(os.path.dirname(file_name), exist_ok=True)
  with open(file_name, 'w') as outfile:
    json.dump(data_array, outfile)

# fetch history data from cloud
propfind_url = f'https://<nextclouddomain>/remote.php/dav/files/{USERNAME}/<path>/'
propfind_files = requests.request(method = "PROPFIND", url = propfind_url, auth = HTTPBasicAuth(USERNAME, APP_PW))
if propfind_files.status_code != 207:
  raise Exception("Could not read log files")

# parse XML and find json files
dom = minidom.parseString(propfind_files.text.encode('ascii', 'xmlcharrefreplace'))
response = dom.getElementsByTagName('d:response')[0]
for r in dom.getElementsByTagName('d:response'):
  href = r.getElementsByTagName('d:href')[0].firstChild.data
  content_type_tag = r.getElementsByTagName('d:getcontenttype')
  if len(content_type_tag):
    content_type = content_type_tag[0].firstChild.data
    if content_type == 'application/json':
      log_file_name = urllib.parse.quote(f'{LOG_DATA_DIR}/{href.split("/")[-1]}')
      if os.path.isfile(log_file_name):
        continue
      log_url = f'https://<nextclouddomain>/{href}'
      log_file = requests.get(url = log_url, auth = HTTPBasicAuth(USERNAME, APP_PW))
      if log_file.status_code == 200:
        write_log(log_file_name, log_file.json())

The propfind method works as a directory listing. With the help of ugly xml processing I can download the json log files if not already available, since the colab container provisioned by google gets destroyed after some idle time.

Query with SQL

Nice thing about spark is, it allows you to run regular SQL, without the need to learn too much new api. See here:

# run queries
df = spark.read.json(path=LOG_DATA_DIR, multiLine=True)
df.createOrReplaceTempView("sensordata")
tempDf = spark.sql("SELECT entity_id, last_changed, state from sensordata where entity_id = 'sensor.temperature_17' and state IS NOT NULL and state != 'unavailable'")
tempDf.show()

First we load all json from the directory and configure a view to use in SQL. Spark even infers the schema, including nested fields!

+--------------------+--------------------+-----+
|           entity_id|        last_changed|state|
+--------------------+--------------------+-----+
|sensor.temperatur...|2023-11-05T00:00:...| 17.4|
|sensor.temperatur...|2023-11-05T01:22:...| 17.3|
|sensor.temperatur...|2023-11-05T07:53:...| 17.4|
|sensor.temperatur...|2023-11-05T08:03:...| 17.5|
|sensor.temperatur...|2023-11-05T08:43:...| 17.6|
|sensor.temperatur...|2023-11-05T08:53:...| 17.5|
|sensor.temperatur...|2023-11-05T09:13:...| 17.4|
|sensor.temperatur...|2023-11-05T09:28:...| 17.3|

Better make some visuals:

lineDf = tempDf.toPandas()
lineDf.state = ps.to_numeric(lineDf.state)
lineDf.last_changed = ps.to_datetime(lineDf.last_changed)
lineDf.plot.line(x = "last_changed", y = ["state"], xlabel = 'Date', ylabel = 'Celsius', title = 'Erdgeschoss Eingang Temperatur', marker='.', markersize=5, lw=2, grid=True)
plt.legend(["Temp Haustür"])

weatherDf = spark.sql("SELECT entity_id, last_changed, attributes.temperature as temp_outside from sensordata where entity_id = 'weather.pinguhome'")
weatherPdDf = weatherDf.toPandas()
weatherPdDf.temp_outside = ps.to_numeric(weatherPdDf.temp_outside)
weatherPdDf.last_changed = ps.to_datetime(weatherPdDf.last_changed)
weatherPdDf.plot.line(x = "last_changed", y = ["temp_outside"], xlabel = 'Date', ylabel = 'Celsius', title = 'Leipzig Temperatur', marker='.', markersize=5, lw=2, grid=True)
plt.legend(["Temp Outside"])

Haustür Temperatur Leipzig Temp

You can see small day/night fluctuations and a general trend downwards.

activityDf = spark.sql("""
SELECT entity_id, COUNT(state) as triggered
FROM sensordata
WHERE attributes.device_class = 'motion' and state = 'on'
GROUP BY entity_id
""")
activityPsDf = activityDf.toPandas()
activityPsDf.plot.pie(y='triggered', labels=activityPsDf["entity_id"], autopct='%1.1f%%')

Or this example groups all motion sensors and counts the number of invocations:

Motion Activity

The motion sensor in the kitchen is triggered most, the bedroom the least. Looks like we are always eating but never sleep ;) Anyway you could create nice profiles and see what rooms are used most. In future I also want to analyze the battery runtime of the sensors, since keeping track and swapping the batteries is always an ongoing maintenance issue.

Lets see what we can do in the next article!