From 146e0c2eea826314ff1fc306b93f47b97cb49330 Mon Sep 17 00:00:00 2001 From: sietse jonker Date: Tue, 26 Mar 2024 21:40:34 +0100 Subject: [PATCH 1/3] Add data processing functions and WebSocket connection handling --- server/data.py | 121 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 server/data.py diff --git a/server/data.py b/server/data.py new file mode 100644 index 0000000..891fd8f --- /dev/null +++ b/server/data.py @@ -0,0 +1,121 @@ +import asyncio +import websockets +import mysql.connector +import json + +sensorNodeArray = [] +enqueteNodeArray = [] + +async def processSensorNodeData(data): + try: + mydb = dbLogin() + cursor = mydb.cursor() + + # MACDataReading = mydb.cursor() + # MACDataReading.execute("SELECT MAC FROM Node") + + print('some_response') + query = "INSERT INTO `Measurement` (NodeID, Type, Value) VALUES (%s, %s, %s)" + processedData = json.loads(data) + + processedTemp = (processedData['Temp']) + processedHumi = (processedData['Humi']) + processedeCO2 = (processedData['eCO2']) + processedTVOC = (processedData['TVOC']) + processedMAC = (processedData['node']) + MACTuple = (processedMAC,) + + # MACDataFetching = MACDataReading.fetchall() + # MACArray = list(MACDataFetching) + + + # if MACTuple not in MACArray: + # addingNode = "INSERT INTO `Node` (MAC) VALUES (%s)" + # cursor.execute(addingNode, MACTuple) + # mydb.commit() + + pushingDataArray = [(1, "Temp", processedTemp), (1, "Humi", processedHumi), (1, "eCO2", processedeCO2), (1, "TVOC", processedTVOC)] + for i in pushingDataArray: + print(query ,i) + cursor.execute(query, i) + mydb.commit() + + except mysql.connector.Error as err: + print("MySQL Error:", err) + finally: + cursor.close() + mydb.close() + +async def receive_data(): + uri = "ws://145.92.8.114/ws" + + try: + async with websockets.connect(uri) as websocket: + while True: + data = await websocket.recv() + print(f"Received data: {data}") + processedData = json.loads(data) + macAdress = processedData['node'] + if processedData["Temp"]: + type = 'sensor' + else: + type = 'enquete' + + await getNodeInfo('sensor') + await getNodeInfo('enquete') + + if macAdress in sensorNodeArray: + await processSensorNodeData(data) + elif macAdress in enqueteNodeArray: + await processEnqueteNodeData(data) + else: + await newNode(macAdress, type) + except websockets.ConnectionClosedError as e: + print("WebSocket connection closed:", e) + +async def processEnqueteNodeData(data): + mydb = dbLogin() + cursor = mydb.cursor() + + query = "INSERT INTO `Response` (NodeID, QuestionID, Response) VALUES (%s, %s, %s)" + processedData = json.loads(data) + + + +async def main(): + await receive_data() + +def dbLogin(): + mydb = mysql.connector.connect( + host="localhost", + user="root", + password="Dingleberries69!", + database="NodeData" + ) + + return mydb + +async def getNodeInfo(type): + id = (type,) + mydb = dbLogin() + cursor = mydb.cursor() + cursor.execute("""SELECT * FROM Node WHERE Type = %s""", id) + nodeInfo = cursor.fetchall() + print(nodeInfo) + if nodeInfo != None: + if type == 'sensor': + sensorNodeArray.append(nodeInfo[]) + elif type == 'enquete': + enqueteNodeArray.append(nodeInfo) + +async def newNode(mac, type): + id = (mac, type,) + mydb = dbLogin() + + cursor = mydb.cursor() + cursor.execute("INSERT INTO `Node` (MAC, Type) VALUES (%s, %s)", id) + print("new node assigned") + mydb.commit() + +asyncio.run(main()) + From 44a2d1e5c6897c4bb62aeb0992511863b49f3eb1 Mon Sep 17 00:00:00 2001 From: sietse jonker Date: Tue, 26 Mar 2024 22:25:07 +0100 Subject: [PATCH 2/3] Refactor getNodeInfo function to store node information in separate arrays --- server/data.py | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/server/data.py b/server/data.py index 891fd8f..e6acb5d 100644 --- a/server/data.py +++ b/server/data.py @@ -54,8 +54,10 @@ async def receive_data(): while True: data = await websocket.recv() print(f"Received data: {data}") + processedData = json.loads(data) macAdress = processedData['node'] + if processedData["Temp"]: type = 'sensor' else: @@ -64,9 +66,11 @@ async def receive_data(): await getNodeInfo('sensor') await getNodeInfo('enquete') - if macAdress in sensorNodeArray: + print(str(sensorNodeArray)) + + if str(macAdress) in sensorNodeArray: await processSensorNodeData(data) - elif macAdress in enqueteNodeArray: + elif str(macAdress) in enqueteNodeArray: await processEnqueteNodeData(data) else: await newNode(macAdress, type) @@ -96,17 +100,31 @@ def dbLogin(): return mydb async def getNodeInfo(type): + nodeInfoArray = [] + id = (type,) mydb = dbLogin() cursor = mydb.cursor() - cursor.execute("""SELECT * FROM Node WHERE Type = %s""", id) + cursor.execute("""SELECT MAC FROM Node WHERE Type = %s""", id) nodeInfo = cursor.fetchall() - print(nodeInfo) - if nodeInfo != None: - if type == 'sensor': - sensorNodeArray.append(nodeInfo[]) - elif type == 'enquete': - enqueteNodeArray.append(nodeInfo) + + for tuples in nodeInfo: + for item in tuples: + nodeInfoArray.append(item) + print(nodeInfoArray) + + if type == 'sensor': + sensorNodeArray = nodeInfoArray + print(sensorNodeArray) + return sensorNodeArray + + elif type == 'enquete': + enqueteNodeArray = nodeInfoArray + return enqueteNodeArray + + + + async def newNode(mac, type): id = (mac, type,) From 970b7eaa41a1fd111412dfefb99957240445e73e Mon Sep 17 00:00:00 2001 From: sietse jonker Date: Tue, 26 Mar 2024 22:46:50 +0100 Subject: [PATCH 3/3] it works yippee (depending on what node sends it data the script will send it to the corresponding table with automatic node numbering) --- server/data.py | 49 ++++++++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/server/data.py b/server/data.py index e6acb5d..c932aa2 100644 --- a/server/data.py +++ b/server/data.py @@ -6,15 +6,11 @@ import json sensorNodeArray = [] enqueteNodeArray = [] -async def processSensorNodeData(data): +async def processSensorNodeData(data, nodeID): try: mydb = dbLogin() cursor = mydb.cursor() - # MACDataReading = mydb.cursor() - # MACDataReading.execute("SELECT MAC FROM Node") - - print('some_response') query = "INSERT INTO `Measurement` (NodeID, Type, Value) VALUES (%s, %s, %s)" processedData = json.loads(data) @@ -23,18 +19,8 @@ async def processSensorNodeData(data): processedeCO2 = (processedData['eCO2']) processedTVOC = (processedData['TVOC']) processedMAC = (processedData['node']) - MACTuple = (processedMAC,) - # MACDataFetching = MACDataReading.fetchall() - # MACArray = list(MACDataFetching) - - - # if MACTuple not in MACArray: - # addingNode = "INSERT INTO `Node` (MAC) VALUES (%s)" - # cursor.execute(addingNode, MACTuple) - # mydb.commit() - - pushingDataArray = [(1, "Temp", processedTemp), (1, "Humi", processedHumi), (1, "eCO2", processedeCO2), (1, "TVOC", processedTVOC)] + pushingDataArray = [(nodeID, "Temp", processedTemp), (nodeID, "Humi", processedHumi), (nodeID, "eCO2", processedeCO2), (nodeID, "TVOC", processedTVOC)] for i in pushingDataArray: print(query ,i) cursor.execute(query, i) @@ -63,15 +49,19 @@ async def receive_data(): else: type = 'enquete' + + await getNodeInfo('sensor') await getNodeInfo('enquete') - print(str(sensorNodeArray)) + print(sensorNodeArray) - if str(macAdress) in sensorNodeArray: - await processSensorNodeData(data) - elif str(macAdress) in enqueteNodeArray: - await processEnqueteNodeData(data) + if macAdress in sensorNodeArray: + nodeID = await getNodeID(macAdress) + await processSensorNodeData(data, nodeID) + elif macAdress in enqueteNodeArray: + nodeID = await getNodeID(macAdress) + await processEnqueteNodeData(data, nodeID) else: await newNode(macAdress, type) except websockets.ConnectionClosedError as e: @@ -100,6 +90,9 @@ def dbLogin(): return mydb async def getNodeInfo(type): + global sensorNodeArray + global enqueteNodeArray + nodeInfoArray = [] id = (type,) @@ -113,6 +106,9 @@ async def getNodeInfo(type): nodeInfoArray.append(item) print(nodeInfoArray) + cursor.close() + mydb.close() + if type == 'sensor': sensorNodeArray = nodeInfoArray print(sensorNodeArray) @@ -123,6 +119,17 @@ async def getNodeInfo(type): return enqueteNodeArray +async def getNodeID(macAdress): + id = (macAdress,) + mydb = dbLogin() + cursor = mydb.cursor() + cursor.execute("""SELECT nodeID FROM Node WHERE MAC = %s""", id) + data = cursor.fetchall() + + nodeID = data[0][0] + + return nodeID +