Files
J1B3-Sensor-boxes/docs/rpi-documentation/Databaseconnection.md
2024-04-05 13:18:21 +02:00

13 KiB

Original Database - Websocket connection(by bram)

For our project, we needed to establish an efficient and functional connection between a live data collector and a database where this data would be stored.

The data we collected originated from live "nodes" which were small boxes equipped with various types of sensors to gather specific data from their surroundings.

This collected data needed to be transmitted to a database to facilitate its presentation on a website we were developing. The website would retrieve the data from the database and display it in various formats.

This is what the connection would be inserted. my connection

Given the high demand of this data connection for our project, it was imprtant that it was reliably. Bram was given the task to designing a connection in Python between the WebSocket (live server) and the database (data storage).

Since we had the WebSocket data on a Raspberry Pi, it made sense to make the connection on the Pi itself. This became an opportunity for Bram to gain knowledge about Python, considering he originaly didn't have experience with this programming language.

Python code + explaination

In the given Raspberry Pi, a file named "data.py" was created, from which this script could be called when needed.

At the beginning of the file, the code starts with importing the different required libraries.

#this library makes functions run simultaneously
import asyncio
#This library makes the connection to the websocket possible.
import websockets
#This library makes a connection with the database
import mysql.connector
#This library makes json data readable.
import json

Afterward, a function would be called related to the "mysql.connector" library, where you would provide the login credentials of the database you have set up to establish a solid connection. This connection will be utilized multiple times later in the code.

#async is making use of the asyncio library.
async def process_data(data):
    try:
        mydb = mysql.connector.connect(
            host="localhost",
            user="root",
            # pasword hidden for privacy
            password="********",
            database="NodeData" 
        )
        cursor = mydb.cursor()

This next part has a lot of different functions, so it will be split up for clarity.

It begins with creating a variable to retrieve information from a specific part of the database. This information is then stored in an array later on. In this case, it is selecting the existing MAC addresses from the database.

Afterward, a query is made for a different part of the code and acts as a "mold" for the data to be sent to the database. The values are not inserted yet because these will be the data collected from the nodes.

        #variable to connect to the DB
        MACDataReading = mydb.cursor()
        #get data from DB
        MACDataReading.execute("SELECT MAC FROM Node")
        #make a mold for the data to get to the DB
        query = "INSERT INTO `Measurement` (NodeID, Type, Value) VALUES (%s, %s, %s)"

In the next part of the code, the data collected from the node (which is done later on) is processed here and made readable for the Python code.

This processed data is then split up into five different types: the temperature, the humidity, the eCO2 and TVOC values, and the MAC address from which this information was sent.

The MAC address is then taken and turned into a tuple. This is done because the database expects a tuple to be inserted instead of a string.

(for more information on tuples I suggest visiting https://www.w3schools.com/python/python_tuples.asp)

        #load the json data and make it readable.
        processedData = json.loads(data)
        #divide the data into types
        processedTemp = (processedData['Temp'])
        processedHumi = (processedData['Humi'])
        processedeCO2 = (processedData['eCO2'])
        processedTvoc = (processedData['TVOC'])
        processedMAC = (processedData['node'])
        #make a tuple of the MAC by placing a comma.
        MACTuple = (processedMAC,)

Coming back to the previous lines of code, the data which was first asked for is now gathered and put into an array.

This array is then examined, and all the data is compared to the newly obtained MAC address.

If it is not found, then the new MAC address is added to the database. This makes automation much easier and makes the process of adding a new node easy.

        #fetching data and adding to an array.
        MACDataFetching = MACDataReading.fetchall()
        MACArray = list(MACDataFetching)
        #see if the given MAC is not in the array.
        if MACTuple not in MACArray:
          #a query to insert the new MAC in the DB
          addingNode = "INSERT INTO `Node` (MAC) VALUES (%s)"
          #combine the query and the data and push it.
          cursor.execute(addingNode, MACTuple)
          mydb.commit()

From here the data which was collected from the websocket gets placed in an array together with a few guidlines to propperly place it in the correct files on the database.

After going along all instances of the array, the data gets pushed together with the query to propperly enter the database.

Sadly this version of the code is only able to push the data from the one node because of some errors within the datase. (This is later fixed in the updated version my teammate made.)

        #making an array with the data to sort it and be able to be pushed to the database.
        pushingDataArray = [(1, "Temp", processedTemp), (1, "Humi", processedHumi), (1, "eCO2", processedeCO2), (1, "TVOC", processedTvoc)]
        #go along all instances in the array, and combine this with the query.
        for i in pushingDataArray:
            print(query ,i)
            cursor.execute(query, i)
            mydb.commit()
    #in the case of an error, show what and where, and after, close the database connection.
    except mysql.connector.Error as err:
        print("MySQL Error:", err)
    finally:
        cursor.close()
        mydb.close()

In the next function, the connection is established with the WebSocket and collects the data sent by the nodes. This data is then stored in a variable named "data". (This data is the same data that was being processed to make it readable for Python and was split up in differnt types.)

This function also verifies if the WebSocket connection can be established and provides an error message when this is not the case.

async def receive_data():
    uri = "ws://145.92.8.114/ws"

    try:
        async with websockets.connect(uri) as websocket:
            while True:
                data = await websocket.recv()
                print(f"Received data: {data}")
                await process_data(data)
    except websockets.ConnectionClosedError as e:
        print("WebSocket connection closed:", e)

This is one of the last functions where the file is instructed to wait for a WebSocket connection before executing the code. This is done to prevent false data from entering the database.

async def main():
    await receive_data()

asyncio.run(main())

As a summary, this code is meant to establish connections both to the database and the WebSocket to enable a data connection between them. When new data arrives, it will be pushed to the database, and if a new MAC address is encountered, it will be added to the list of addresses.

(The link to the code https://gitlab.fdmci.hva.nl/propedeuse-hbo-ict/onderwijs/2023-2024/out-a-se-ti/blok-3/qaajeeqiinii59/-/blob/main/server/web-data-connection/data.py?ref_type=heads)

New Version (by sietse)

Changes made

The original code was a good start, but it had some issues. The code could only handle the data from the sensorNodes and didn't include the nodeID for measurements.

Since we have 2 kind of nodes (sensorNodes and enqueteNodes) we needed to make another function to commit the enqueteData in the database. I have also made a filter to know which data is from the sensorNodes and which data is from the enqueteNodes. This way we can commit the data to the right table in the database.

I have also added a function to get the nodeID from the MAC address. This way we can commit the data to the right node in the database.

The new "filter" code

Function to get a list with macAdresses from the sensorNodes and enqueteNodes

To filter i have made 2 lists, one with all the mac adresses of the sensorNodes and the other with the mac adresses of the enqueteNodes.

The function that handles that and updates the list is the following:

async def getNodeInfo(type):
    global sensorNodeArray
    global enqueteNodeArray

    nodeInfoArray = []

    id = (type,)
    mydb = dbLogin()
    cursor = mydb.cursor()
    cursor.execute("""SELECT MAC FROM Node WHERE Type = %s""", id)
    nodeInfo = cursor.fetchall()

    for tuples in nodeInfo:
        for item in tuples:
            nodeInfoArray.append(item)
    print(nodeInfoArray)

    cursor.close()
    mydb.close()

    if type == 'sensor':
        sensorNodeArray = nodeInfoArray
        print(sensorNodeArray)
        return sensorNodeArray

    elif type == 'enquete':
        enqueteNodeArray = nodeInfoArray
        return enqueteNodeArray

As you can it works like this:

  1. It gets the MAC adresses from the database with the type of node you want to get the data from. (sensor or enquete)

  2. It executes the command and puts the data in a list.

  3. It uses a nested for loop to get the data out of the tuples and puts it in the nodeInfoArray.

  4. It updates, depending on what type, the sensorNodeArray or the enqueteNodeArray with the new data (NodeInfoArray).

  5. It returns the array with the data.

The filter code

Now that we have the data we can filter the data from the websocket.

    data = await websocket.recv()

    processedData = json.loads(data)
    macAdress = processedData['node']

    if "Temp" in processedData:
        type = 'sensor'
    else:
        type = 'enquete'

    await getNodeInfo('sensor')
    await getNodeInfo('enquete')

    if macAdress in sensorNodeArray:
        nodeID = await getNodeID(macAdress)
        await processSensorNodeData(data, nodeID)
    elif macAdress in enqueteNodeArray:
        nodeID = await getNodeID(macAdress)
        await processEnqueteNodeData(data, nodeID)
    else:
    await newNode(macAdress, type)

As you can see its alot of code to explain. So to make it easier i made a mermaid diagram to show how the code works / what it does.

graph TD
    A[Get data from websocket] --> B{Is it sensor data or enquete data?}
    B -->|sensor| C[Get sensorNodeArray]
    B -->|enquete| D[Get enqueteNodeArray]
    B -->|New node| E[Add new node to database]
    C -->|data| G[Process sensorNodeData]
    D -->|data| H[Process enqueteNodeData]

The function to get the nodeID

This function is used to get the nodeID from the MAC adress. This way we can commit the data with the right id in the database.

The function to get the nodeID from the MAC adress is the following:

async def getNodeID(macAdress):
    id = (macAdress,)
    mydb = dbLogin()
    cursor = mydb.cursor()
    cursor.execute("""SELECT nodeID FROM Node WHERE MAC = %s""", id)
    data = cursor.fetchall()

    for tuples in data:
        for item in tuples:
            nodeID = item

    return nodeID
  1. It gets the nodeID from the database with the MAC adress.
  2. It executes the command and puts the data in a list.
  3. It uses a nested for loop to get the data out of the tuples and puts it in the nodeID.
  4. It returns the nodeID.

The function to commit the data from the sensorNodes

This function is alot like the original one, with the only 2 changes being that it now also commits the nodeID and that the code to make a new node is now in a different function.

link to code

The function to commit the data from the enqueteNodes

This function is alot like the sensorNode function. It just commits the data to the enqueteData table in the database. And it has another data.

Link to code

The function to add a new node to the database

This function is used to add a new node to the database. This is used when a new node is connected to the websocket, but not yet in the database.

The function to add a new node to the database is the following:

async def newNode(mac, type):
    id = (mac, type)
    mydb = dbLogin()

    cursor = mydb.cursor()
    cursor.execute("INSERT INTO `Node` (MAC, Type) VALUES (%s, %s)", id)
    print("new node assigned")
    mydb.commit()
  1. It gets the MAC adress and the type of node from the arguments.
  2. It executes the command to add the new node to the database.
  3. It prints that a new node is assigned.
  4. It commits the data to the database.