9.0 KiB
Original Version (by bram)
Python code + explaination
We wanted to make a working connection between our websocket wich runs all the data gatherd by our nodes and a live feed to our database. So we set out to make this connection using python.
At first we needed to import the folowing librarys:
// everything is running async so the code can run together and not interfere with eachother.
import asyncio
import websockets
// a library to connect to the database.
import mysql.connector
import json
Then we began the process of connecting with both the websocket and the database.
First-off, we began making a connection to the database by using a mysql library in wich we gave the log in in order to connect to our ow database.
//the data that has to be pushed needs to be
async def process_data(data):
try:
mydb = mysql.connector.connect(
host="localhost",
user="*****",
password="*********",
database="*******"
)
Then after this we code in the infromation we want to put inside of the database. The data collected from the websocket is json data, so this has to be changed.
//Making a variable for the database connection
cursor = mydb.cursor()
//Making a variable for the database connection
MACDataReading = mydb.cursor()
//find the correct section and interact with it.
MACDataReading.execute("SELECT MAC FROM Node")
//the query for what needs to be inserted into the database ( atleast where it has to go).
query = "INSERT INTO `Measurement` (NodeID, Type, Value) VALUES (%s, %s, %s)"
//the recieved data from the websocket is json data so needs to be changed.
processedData = json.loads(data)
//variables about the recieved data points.
processedTemp = (processedData['Temp'])
processedHumi = (processedData['Humi'])
processedECo = (processedData['eCO2'])
processedTvoc = (processedData['TVOC'])
processedMAC = (processedData['node'])
//change the recieved macadress to a tuple.
MACTuple = (processedMAC,)
//fetch the data from the database an[d put it in an array.
MACDataFetching = MACDataReading.fetchall()
MACArray = list(MACDataFetching)
//see if the fetched data is not in the gotten array.
//otehrwise insert it into the database directly.
if MACTuple not in MACArray:
addingNode = "INSERT INTO `Node` (MAC) VALUES (%s)"
cursor.execute(addingNode, MACTuple)
mydb.commit()
//the websocket data that needs to be sent to the database.
pushingDataArray = [(1, "Temp", processedTemp), (1, "Humi", processedHumi), (1, "eCO2", processedECo), (1, "TVOC", processedTvoc)]
// forloop, go allong the array.
for i in pushingDataArray:
print(query ,i)
cursor.execute(query, i)
mydb.commit()
// show me the error it there is one.
except mysql.connector.Error as err:
print("MySQL Error:", err)
finally:
cursor.close()
mydb.close()
After fully connecting t othe database, making statements of what to put there and telling the code what to do, we ofcourse need to write the code to connect to the weebsocket. We begin by telling our websocket id and what type of port we are using. Then we will collect live data from the conected websocket, store it in a variable, and then in the previous code
//here the connection to the websocked is made
async def receive_data():
uri = "****************"
try:
async with websockets.connect(uri) as websocket:
while True:
// the data collected from the websocket is.
data = await websocket.recv()
// data recieved: conformation.
print(f"Received data: {data}")
await process_data(data)
// error sowing.
except websockets.ConnectionClosedError as e:
print("WebSocket connection closed:", e)
async def main():
await receive_data()
asyncio.run(main())
New Version (by sietse)
Changes made
The original code was a good start, but it had some issues. The code could only handle the data from the sensorNodes and didn't include the nodeID for measurements.
Since we have 2 kind of nodes (sensorNodes and enqueteNodes) we needed to make another function to commit the enqueteData in the database. I have also made a filter to know which data is from the sensorNodes and which data is from the enqueteNodes. This way we can commit the data to the right table in the database.
I have also added a function to get the nodeID from the MAC address. This way we can commit the data to the right node in the database.
The new "filter" code
Function to get a list with macAdresses from the sensorNodes and enqueteNodes
To filter i have made 2 lists, one with all the mac adresses of the sensorNodes and the other with the mac adresses of the enqueteNodes.
The function that handles that and updates the list is the following:
async def getNodeInfo(type):
global sensorNodeArray
global enqueteNodeArray
nodeInfoArray = []
id = (type,)
mydb = dbLogin()
cursor = mydb.cursor()
cursor.execute("""SELECT MAC FROM Node WHERE Type = %s""", id)
nodeInfo = cursor.fetchall()
for tuples in nodeInfo:
for item in tuples:
nodeInfoArray.append(item)
print(nodeInfoArray)
cursor.close()
mydb.close()
if type == 'sensor':
sensorNodeArray = nodeInfoArray
print(sensorNodeArray)
return sensorNodeArray
elif type == 'enquete':
enqueteNodeArray = nodeInfoArray
return enqueteNodeArray
As you can it works like this:
-
It gets the MAC adresses from the database with the type of node you want to get the data from. (sensor or enquete)
-
It executes the command and puts the data in a list.
-
It uses a nested for loop to get the data out of the tuples and puts it in the nodeInfoArray.
-
It updates, depending on what type, the sensorNodeArray or the enqueteNodeArray with the new data (NodeInfoArray).
-
It returns the array with the data.
The filter code
Now that we have the data we can filter the data from the websocket.
data = await websocket.recv()
processedData = json.loads(data)
macAdress = processedData['node']
if "Temp" in processedData:
type = 'sensor'
else:
type = 'enquete'
await getNodeInfo('sensor')
await getNodeInfo('enquete')
if macAdress in sensorNodeArray:
nodeID = await getNodeID(macAdress)
await processSensorNodeData(data, nodeID)
elif macAdress in enqueteNodeArray:
nodeID = await getNodeID(macAdress)
await processEnqueteNodeData(data, nodeID)
else:
await newNode(macAdress, type)
As you can see its alot of code to explain. So to make it easier i made a mermaid diagram to show how the code works / what it does.
graph TD
A[Get data from websocket] --> B{Is it sensor data or enquete data?}
B -->|sensor| C[Get sensorNodeArray]
B -->|enquete| D[Get enqueteNodeArray]
B -->|New node| E[Add new node to database]
C -->|data| G[Process sensorNodeData]
D -->|data| H[Process enqueteNodeData]
The function to get the nodeID
This function is used to get the nodeID from the MAC adress. This way we can commit the data with the right id in the database.
The function to get the nodeID from the MAC adress is the following:
async def getNodeID(macAdress):
id = (macAdress,)
mydb = dbLogin()
cursor = mydb.cursor()
cursor.execute("""SELECT nodeID FROM Node WHERE MAC = %s""", id)
data = cursor.fetchall()
for tuples in data:
for item in tuples:
nodeID = item
return nodeID
- It gets the nodeID from the database with the MAC adress.
- It executes the command and puts the data in a list.
- It uses a nested for loop to get the data out of the tuples and puts it in the nodeID.
- It returns the nodeID.
The function to commit the data from the sensorNodes
This function is alot like the original one, with the only 2 changes being that it now also commits the nodeID and that the code to make a new node is now in a different function.
The function to commit the data from the enqueteNodes
This function is alot like the sensorNode function. It just commits the data to the enqueteData table in the database. And it has another data.
The function to add a new node to the database
This function is used to add a new node to the database. This is used when a new node is connected to the websocket, but not yet in the database.
The function to add a new node to the database is the following:
async def newNode(mac, type):
id = (mac, type)
mydb = dbLogin()
cursor = mydb.cursor()
cursor.execute("INSERT INTO `Node` (MAC, Type) VALUES (%s, %s)", id)
print("new node assigned")
mydb.commit()
- It gets the MAC adress and the type of node from the arguments.
- It executes the command to add the new node to the database.
- It prints that a new node is assigned.
- It commits the data to the database.