-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathvhsapi.py
More file actions
76 lines (69 loc) · 3.22 KB
/
vhsapi.py
File metadata and controls
76 lines (69 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import requests
from time import sleep
class VHSApi:
# The VHS API runs at http://api.vanhack.ca/
# Source code is at https://github.com/vhs/api/blob/master/lib/VHSAPI.pm
# The online API supports a wide range of queries. (But this code does not)
# '<hostname>/s/<spacename>/data/<dataname>/update?value=<datavalue>'
# '<hostname>/s/<spacename>/data/<dataname>/fullpage'
# '<hostname>/s/<spacename>/data/<dataname>'
# '<hostname>/s/<spacename>/data/<dataname>.json'
# '<hostname>/s/<spacename>/data/<dataname>.txt'
# '<hostname>/s/<spacename>/data/<dataname>.js'
# '<hostname>/s/<spacename>/data/<dataname>/feed'
# '<hostname>/s/<spacename>/data/history/<dataname>.json'
api_update_str = '/update?value='
def __init__(self, dataURL = 'http://api.vanhack.ca/s/vhs/data/', timeout = 5):
self.baseURL = dataURL
self.timeout = timeout
def WaitForConnect(self, dataname):
#Periodically queries the API until it receives a successful response.
#dataname is the variable we query the API for.
sleepAmt = .25
sleepMax = 16
while self.Query(dataname) == False:
print('Waiting ' + str(sleepAmt) + 's for retry...')
sleep(sleepAmt)
if sleepAmt < sleepMax:
sleepAmt *= 2
def UpdateIfNecessary(self, dataname, datavalue):
#Query the dataname to see if it matches datavalue, and then
#update with datavalue if they are different. This prevents
#needless writes, which is important because each API variable
#has a timestamp that is updated every time it changes.
q = self.Query(dataname)
if q != False and q != datavalue:
self.Update(dataname, datavalue)
def Query(self, dataname):
#Returns the value of dataname from the VHSApi server, or False if query failed.
try:
r = requests.get( self.baseURL + dataname + '.json' , timeout = self.timeout )
if r.status_code == requests.codes.ok:
#Expected json response in format:
#{"last_updated":<unixtimestamp>,"name":"<dataname>","value":"<datavalue>"}
j = r.json()
if (j['name'] == dataname):
print('VHSApi Query "' + dataname + '" is "' + j['value'] + '"')
return j['value']
print('VHSApi Query of "' + dataname + '" failed.')
print('Response code: ' + str(r.status_code))
print('Response text: ' + r.text)
except Exception as e:
print('VHSApi Query failed: ', str(e))
return False
def Update(self, dataname, datavalue):
try:
r = requests.get( self.baseURL + dataname + self.api_update_str + datavalue , timeout = self.timeout )
if r.status_code == requests.codes.ok:
#Expected json response in format:
#{"result":{"value":"<datavalue>","last_updated":<unixtimestamp>,"name":"<dataname>"},"status":"OK"}
j = r.json()
if (j['status'] == 'OK' and j['result']['name'] == dataname and j['result']['value'] == datavalue):
print('VHSApi Update "' + dataname + '" to "' + datavalue + '"')
return datavalue
print('VHSApi Update of "' + dataname + '" failed.')
print('Response code: ' + str(r.status_code))
print('Response text: ' + r.text)
except Exception as e:
print('VHSApi Update failed: ' + str(e))
return False