-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.py
More file actions
133 lines (115 loc) · 4.63 KB
/
client.py
File metadata and controls
133 lines (115 loc) · 4.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import asyncio
import argparse
import comm
import util
import time
import sys
import psycopg2
async def send_execute_request(coordinator, node_id, query, args=tuple()):
kind = "EXECUTE"
data = {"node_id": node_id,
"query": query,
"args": args}
return await coordinator.send(kind, data)
async def main():
argparser = argparse.ArgumentParser()
argparser.add_argument("--coordinator", type=util.hostname_port_type, required=True)
argparser.add_argument("--demo", choices=demo_tables.keys())
argparser.add_argument("--n-nodes", type=int)
argparser.add_argument("--data-db")
args = argparser.parse_args()
coordinator_hostname, coordinator_port = args.coordinator
is_demo = False
if args.demo:
if not args.n_nodes:
print("For demo mode, the total number of participant nodes must be given with --n-nodes.")
if not args.data_db:
print("For demo mode, a source database must be given with --data-db.")
if not args.n_nodes or not args.data_db:
return 1
is_demo = True
else:
if args.n_nodes or args.data_db:
print("Arguments --n-nodes and --data-db may only be given in demo mode.")
coordinator = comm.RemoteCallClient(coordinator_hostname, coordinator_port)
await coordinator.connect()
print("Connected to coordinator at {}:{}.".format(coordinator_hostname, coordinator_port))
if not is_demo:
await interactive_ui(coordinator)
else:
await demo_ui(coordinator, args.data_db, args.n_nodes, args.demo)
async def interactive_ui(coordinator):
keep_going = True
while keep_going:
print("QUERY --------------------")
sys.stdout.write("Execute query: ")
query = input()
sys.stdout.write("On this node #: ")
node_id = input()
success = await send_execute_request(coordinator, node_id, query)
if not success:
print("Error: EXECUTE was not successful. (Server may be blocked at previous transaction.)")
sys.stdout.write("Send another query request? (y/n) ")
keep_going = (input().lower() == "y")
async def demo_ui(coordinator, data_db, n_nodes, table):
columns = demo_tables[table]
create_table_query = demo_create_tables[table]
print("Initializing tables on all participant nodes.")
for i in range(n_nodes):
print(f"Sending CREATE TABLE {table} query to node {i}.")
success = await send_execute_request(coordinator, i, create_table_query)
if not success:
print(f"Failed.")
return
source_db = psycopg2.connect(data_db)
source_cur = source_db.cursor()
source_cur.execute("select * from " + table)
try:
for row in source_cur:
timestamp_i = columns.index("timestamp")
row = list(row)
row[timestamp_i] = row[timestamp_i].strftime('%Y-%m-%d %H:%M:%S')
node_id = hash(row[0]) % n_nodes
query = f"INSERT INTO {table} ({','.join(columns)}) VALUES ({', '.join(['%s']*len(columns))})"
print(f"Sending INSERT {row[0]} request to node {node_id}.")
success = await send_execute_request(coordinator, node_id, query, tuple(row))
if not success:
print("Failed.")
print("Sleeping for one second.")
time.sleep(1)
except KeyboardInterrupt:
print("Killed.")
demo_tables = {
"thermometerobservation": ("id", "temperature", "timestamp", "sensor_id"),
"wemoobservation": ("id", "currentmilliwatts", "ontodayseconds", "timestamp", "sensor_id"),
"wifiapobservation": ("id", "clientid", "timestamp", "sensor_id")
}
demo_create_tables = {
"thermometerobservation":
"""CREATE TABLE IF NOT EXISTS ThermometerObservation (
id varchar(255) NOT NULL,
temperature integer DEFAULT NULL,
timeStamp timestamp NOT NULL,
sensor_id varchar(255) DEFAULT NULL,
PRIMARY KEY (id)
)""",
"wemoobservation":
"""CREATE TABLE IF NOT EXISTS wemoobservation (
id varchar(255) NOT NULL,
currentMilliWatts integer DEFAULT NULL,
onTodaySeconds integer DEFAULT NULL,
timeStamp timestamp NOT NULL,
sensor_id varchar(255) DEFAULT NULL,
PRIMARY KEY (id)
)""",
"wifiapobservation":
"""CREATE TABLE IF NOT EXISTS WiFiAPObservation (
id varchar(255) NOT NULL,
clientId varchar(255) DEFAULT NULL,
timeStamp timestamp NOT NULL,
sensor_id varchar(255) DEFAULT NULL,
PRIMARY KEY (id)
)"""
}
if __name__ == "__main__":
asyncio.run(main())