Files
J1B3-Sensor-boxes/docs/rpi-documentation/Databaseconnection.md
2024-04-04 17:04:30 +02:00

9.3 KiB

Original Database - Websocket connection Version (by bram)

For our project, we needed to establish an efficient and functional connection between a live data collector and a database where this data would be stored.

The data we collected originated from live "nodes" which were small boxes equipped with various types of sensors to gather specific data from their surroundings.

This collected data needed to be transmitted to a database to facilitate its presentation on a website we were developing. The website would retrieve the data from the database and display it in various formats.

Given the critical nature of this data connection for our project, it was imperative that it functioned reliably. Bram was tasked with designing a connection in Python between the WebSocket (live server) and the database (data storage).

Since we had the WebSocket data on a Raspberry Pi, it made sense to implement the connection on the Pi itself. This presented an opportunity for Bram to acquire knowledge about Python, considering he initially lacked experience with this programming language.

Python code + explaination

In the given Raspberry Pi, a file named "data.py" was created, from which this script could be called when needed.

At the beginning of the file, the code starts with importing the different required libraries.

import asyncio
import websockets
import mysql.connector
import json

Afterward, a function would be called related to the "mysql.connector" library, where you would provide the login credentials of the database you have set up to establish a solid connection. This connection will be utilized multiple times later in the code.

async def process_data(data):
    try:
        mydb = mysql.connector.connect(
            host="localhost",
            user="root",
            # pasword hidden for privacy
            password="********",
            database="NodeData" 
        )
        cursor = mydb.cursor()

This next part has a lot of different functions, so it will be split up for clarity.

It begins with creating a variable to retrieve information from a specific part of the database. This information is then stored in an array later on. In this case, it is selecting the existing MAC addresses from the database.

Afterward, a query is made for a different part of the code and acts as a "mold" for the data to be sent to the database. The values are not inserted yet because these will be the data collected from the nodes.

        #variable to connect to the DB
        MACDataReading = mydb.cursor()
        #get data from DB
        MACDataReading.execute("SELECT MAC FROM Node")
        #make a mold for the data to get to the DB
        query = "INSERT INTO `Measurement` (NodeID, Type, Value) VALUES (%s, %s, %s)"
        processedData = json.loads(data)
        processedTemp = (processedData['Temp'])
        processedHumi = (processedData['Humi'])
        processedeCO2 = (processedData['eCO2'])
        processedTvoc = (processedData['TVOC'])
        processedMAC = (processedData['node'])
        MACTuple = (processedMAC,)

        MACDataFetching = MACDataReading.fetchall()
        MACArray = list(MACDataFetching)
        

        if MACTuple not in MACArray:
          addingNode = "INSERT INTO `Node` (MAC) VALUES (%s)"
          cursor.execute(addingNode, MACTuple)
          mydb.commit()

        pushingDataArray = [(1, "Temp", processedTemp), (1, "Humi", processedHumi), (1, "eCO2", processedeCO2), (1, "TVOC", processedTvoc)]
        for i in pushingDataArray:
            print(query ,i)
            cursor.execute(query, i)
            mydb.commit()
        
    except mysql.connector.Error as err:
        print("MySQL Error:", err)
    finally:
        cursor.close()
        mydb.close()

async def receive_data():
    uri = "ws://145.92.8.114/ws"

    try:
        async with websockets.connect(uri) as websocket:
            while True:
                data = await websocket.recv()
                print(f"Received data: {data}")
                await process_data(data)
    except websockets.ConnectionClosedError as e:
        print("WebSocket connection closed:", e)

async def main():
    await receive_data()

asyncio.run(main())

New Version (by sietse)

Changes made

The original code was a good start, but it had some issues. The code could only handle the data from the sensorNodes and didn't include the nodeID for measurements.

Since we have 2 kind of nodes (sensorNodes and enqueteNodes) we needed to make another function to commit the enqueteData in the database. I have also made a filter to know which data is from the sensorNodes and which data is from the enqueteNodes. This way we can commit the data to the right table in the database.

I have also added a function to get the nodeID from the MAC address. This way we can commit the data to the right node in the database.

The new "filter" code

Function to get a list with macAdresses from the sensorNodes and enqueteNodes

To filter i have made 2 lists, one with all the mac adresses of the sensorNodes and the other with the mac adresses of the enqueteNodes.

The function that handles that and updates the list is the following:

async def getNodeInfo(type):
    global sensorNodeArray
    global enqueteNodeArray

    nodeInfoArray = []

    id = (type,)
    mydb = dbLogin()
    cursor = mydb.cursor()
    cursor.execute("""SELECT MAC FROM Node WHERE Type = %s""", id)
    nodeInfo = cursor.fetchall()

    for tuples in nodeInfo:
        for item in tuples:
            nodeInfoArray.append(item)
    print(nodeInfoArray)

    cursor.close()
    mydb.close()

    if type == 'sensor':
        sensorNodeArray = nodeInfoArray
        print(sensorNodeArray)
        return sensorNodeArray

    elif type == 'enquete':
        enqueteNodeArray = nodeInfoArray
        return enqueteNodeArray

As you can it works like this:

  1. It gets the MAC adresses from the database with the type of node you want to get the data from. (sensor or enquete)

  2. It executes the command and puts the data in a list.

  3. It uses a nested for loop to get the data out of the tuples and puts it in the nodeInfoArray.

  4. It updates, depending on what type, the sensorNodeArray or the enqueteNodeArray with the new data (NodeInfoArray).

  5. It returns the array with the data.

The filter code

Now that we have the data we can filter the data from the websocket.

    data = await websocket.recv()

    processedData = json.loads(data)
    macAdress = processedData['node']

    if "Temp" in processedData:
        type = 'sensor'
    else:
        type = 'enquete'

    await getNodeInfo('sensor')
    await getNodeInfo('enquete')

    if macAdress in sensorNodeArray:
        nodeID = await getNodeID(macAdress)
        await processSensorNodeData(data, nodeID)
    elif macAdress in enqueteNodeArray:
        nodeID = await getNodeID(macAdress)
        await processEnqueteNodeData(data, nodeID)
    else:
    await newNode(macAdress, type)

As you can see its alot of code to explain. So to make it easier i made a mermaid diagram to show how the code works / what it does.

graph TD
    A[Get data from websocket] --> B{Is it sensor data or enquete data?}
    B -->|sensor| C[Get sensorNodeArray]
    B -->|enquete| D[Get enqueteNodeArray]
    B -->|New node| E[Add new node to database]
    C -->|data| G[Process sensorNodeData]
    D -->|data| H[Process enqueteNodeData]

The function to get the nodeID

This function is used to get the nodeID from the MAC adress. This way we can commit the data with the right id in the database.

The function to get the nodeID from the MAC adress is the following:

async def getNodeID(macAdress):
    id = (macAdress,)
    mydb = dbLogin()
    cursor = mydb.cursor()
    cursor.execute("""SELECT nodeID FROM Node WHERE MAC = %s""", id)
    data = cursor.fetchall()

    for tuples in data:
        for item in tuples:
            nodeID = item

    return nodeID
  1. It gets the nodeID from the database with the MAC adress.
  2. It executes the command and puts the data in a list.
  3. It uses a nested for loop to get the data out of the tuples and puts it in the nodeID.
  4. It returns the nodeID.

The function to commit the data from the sensorNodes

This function is alot like the original one, with the only 2 changes being that it now also commits the nodeID and that the code to make a new node is now in a different function.

link to code

The function to commit the data from the enqueteNodes

This function is alot like the sensorNode function. It just commits the data to the enqueteData table in the database. And it has another data.

Link to code

The function to add a new node to the database

This function is used to add a new node to the database. This is used when a new node is connected to the websocket, but not yet in the database.

The function to add a new node to the database is the following:

async def newNode(mac, type):
    id = (mac, type)
    mydb = dbLogin()

    cursor = mydb.cursor()
    cursor.execute("INSERT INTO `Node` (MAC, Type) VALUES (%s, %s)", id)
    print("new node assigned")
    mydb.commit()
  1. It gets the MAC adress and the type of node from the arguments.
  2. It executes the command to add the new node to the database.
  3. It prints that a new node is assigned.
  4. It commits the data to the database.