-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcalc-co2.py
executable file
·286 lines (251 loc) · 10.8 KB
/
calc-co2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
#!/bin/python
import csv
import sys
from selenium import webdriver
from selenium.webdriver import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support.select import Select
# DEPENDENCY INFO:
# install geckodriver or chromedriver
# pip install selenium
# debian et co with externally managed eggs:
# sudo apt install python3-selenium firefox-esr-geckodriver
# if you're not using the snap version of a browser, download the driver manually and
# put it into your $PATH; you might also need to extract the selenium-manager
# of your platform from an official release of python3-selenium
# TODO: add caching, since the final legs are often likely to be the same?
# TODO: add any diagnostics for duplicates etc.? Running it again is a good way to catch them
if len(sys.argv) == 1:
print("Make sure to pass the input data file path! Bailing out.")
print("calc-co2.py [-n] inputFile [resultFile [eventName]]")
sys.exit(1)
args = sys.argv
quitter = True
for arg in args:
if arg == "-n":
quitter = False
del sys.argv[sys.argv.index(arg)]
break
inputCSV = sys.argv[1]
if len(sys.argv) > 2:
resultsFile = sys.argv[2]
else:
resultsFile = "results.csv"
if len(sys.argv) > 3:
event = sys.argv[3]
else:
event = ""
browser = webdriver.Firefox()
# rewrite header for easier work and to avoid skipping duplicates
header = ['Submitted', 'Name', 'Event', 'E-mail', 'Legs', 'End0', 'End1', 'Passengers1', 'End2', 'Passengers2', 'End3', 'Passengers3', 'End4', 'Passengers4', 'End5', 'Passengers5', 'End6', 'Passengers6', 'End7', 'Passengers7', 'End8', 'Passengers8', 'End9', 'Passengers9', 'End10', 'Passengers10', 'Mode1', 'Fuel1', 'Mode2', 'Fuel2', 'Mode3', 'Fuel3', 'Mode4', 'Fuel4', 'Mode5', 'Fuel5', 'Mode6', 'Fuel6', 'Mode7', 'Fuel7', 'Mode8', 'Fuel8', 'Mode9', 'Fuel9', 'Mode10', 'Fuel10']
errorValue = -10000000000
def waitForVisible(timeout, el):
WebDriverWait(browser, timeout).until(expected_conditions.visibility_of_element_located(el))
# fake wait hack (for some reason implicit wait did not work)
def fakeWait(delay = 5):
try:
waitForVisible(delay, (By.XPATH, "//non-existing"))
except:
pass
def parseEntry(row, writer):
legs = int(row["Legs"])
total = 0
kms = 0
for i in range(legs):
# append commas to avoid selecting common rooted names that sort earlier for short inputs
start = row["End" + str(i)] + ","
end = row["End" + str(i + 1)] + ","
mode = row["Mode" + str(i + 1)]
fuel = row["Fuel" + str(i + 1)]
try:
passengers = int(row["Passengers" + str(i + 1)])
except:
passengers = 1
#start = "Tolmin,"
#end = "London,"
#mode = "Bus"
# fuel = "Electricity"
legEmissions, km = prepCalc(start, end, mode, fuel, passengers)
writer.writerow({ 'Event': row["Event"], 'Name': row["Name"], 'From': start[:-1], 'To': end[:-1], 'Mode': mode, 'Fuel': fuel, 'People': passengers, 'CO2': legEmissions, 'Kilometers': km })
total = total + legEmissions
kms = kms + km
return total, kms
def runTest(start, end, mode, fuel):
if start == end:
return 0, 0
browser.get("https://travelandclimate.org/")
assert 'Travel' in browser.title
# 1 person
browser.find_element(By.ID, "people").click()
browser.find_element(By.ID, "people").send_keys("1")
# set one-way trip
browser.find_element(By.ID, "ways").click()
browser.find_element(By.CSS_SELECTOR, "#ways > :nth-child(2)").click()
# set start
browser.find_element(By.ID, "origin").click()
browser.find_element(By.ID, "origin").send_keys(start)
fakeWait(1)
waitForVisible(10, (By.CSS_SELECTOR, ".pac-item:nth-child(1)"))
browser.find_element(By.CSS_SELECTOR, ".pac-item:nth-child(1)").click()
# set destination
browser.find_element(By.ID, "destinations.0").click()
browser.find_element(By.ID, "destinations.0").send_keys(end)
fakeWait(1)
waitForVisible(10, (By.CSS_SELECTOR, ".pac-item:nth-child(1)"))
destEl = browser.find_element(By.CSS_SELECTOR, ".pac-item:nth-child(1)")
# sigh, child blocks it, but luckily is not a full overlay
elH = destEl.rect["height"]
elW = destEl.rect["width"]
ActionChains(browser).move_to_element_with_offset(destEl, - elW // 4, elH // 2 - 1).click().perform()
# ignore sleep and trigger calc
browser.find_element(By.XPATH, "//button[contains(text(), 'Calculate')]").click()
# wait for initial calc and then choose transport type
# to id xpath in dev tools: $x("some xpath")
# normal car:
button = "t-driving"
scaleFactor = 1
originalMode = mode
colSelector = ".column-driving .bg-drivingcar, .column-driving .bg-carferrycar"
if mode == "Car":
pass
elif mode == "Bus" or mode == "Train":
# NOTE: sometimes only offers a train for bus rides,
# probably vice-versa as well. However the emission factors are
# similar, so we don't mind
button = "t-public-transport"
colSelector = ".column-public-transport .bg-publictransport"
elif mode == "Plane":
# NOTE: includes shuttle to city if relevant
# FIXME: long flights can have so high emissions the chart offsets elements enough
# for this to fail, eg. with Houston to Athens
button = "t-flying"
colSelector = ".column-flying > .bg-flying"
elif mode == "Ferry":
# taken into account internally under Car
# FIXME: unless there's no other option, then there's an independent ferry button
# hit pe. with Helsinki - Tallinn
return 0, 0
elif mode == "Motorbike":
# NOTE: comparing average fuel consumption is tricky, after some scouring
# we take half of the car value
mode = "Car"
scaleFactor = 0.5
elif mode == "Bike" or mode == "Walk":
mode = "Car"
else:
print("unknown mode! " + mode)
return errorValue, errorValue
# actually pick ride type
if mode == "Plane":
pass
#import pdb; pdb.set_trace()
fakeWait(2)
waitForVisible(10, (By.CLASS_NAME, button))
ride = browser.find_element(By.CLASS_NAME, button)
if ride:
ride.click()
else:
print("No known connection for leg type: " + mode)
return errorValue, errorValue
# extra steps to get length of leg and pick fuel
# NOTE: there could be more than one to pick (eg. with interim ferries, for airport access or changing buses)
# open details column overlay and extract leg distance
# there can be more than one valid bar (common with buses or if a ferry interrupts cars)
fakeWait(2)
waitForVisible(5, (By.CSS_SELECTOR, colSelector))
#import pdb; pdb.set_trace()
bars = browser.find_elements(By.CSS_SELECTOR, colSelector)
km = 0
seenKMs = [] # hack to avoid occasional double counting
for bar in bars:
if not bar.is_displayed() or "hidden" in bar.get_dom_attribute("class"):
continue
ActionChains(browser).move_to_element(bar).perform()
fakeWait(2)
# yuck, occasional XPATH failure, so we have to filter manually
kmEls = bar.find_elements(By.XPATH, "//div/div/div[contains(text(), ' km')]")
for kmEl in kmEls:
if " km" in kmEl.text and kmEl.text not in seenKMs:
seenKMs.append(kmEl.text)
km += int(kmEl.text.split()[0])
break
# change fuel if not diesel
otherFuels = [ "Petrol", "Natural gas", "Biogas", "Ethanol", "Biodiesel", "Electricity" ]
if mode == "Car" and fuel in otherFuels:
# translate to actual values
fuelIdx = otherFuels.index(fuel)
otherFuels = [ "gasoline", "gas (fossil)", "gas (bio)", "ethanol", "hvo 100", "electricity" ]
# pick fuel
els = browser.find_element(By.XPATH, "//select[contains(@*,'selectedDrivingFuel')]")
select = Select(els)
select.select_by_visible_text(otherFuels[fuelIdx])
fakeWait(2)
# choose random useless accommodation, so we can trigger the final calculation
browser.find_element(By.CLASS_NAME, "t-accommodation").click()
resultH2 = '//h2[small[text()="kg"]]'
waitForVisible(5, (By.XPATH, resultH2))
# grab and clean up the result
emissions = browser.find_element(By.XPATH, resultH2).text
kg = round(int(emissions.split()[0]) * scaleFactor)
if originalMode == "Bike" or originalMode == "Walk":
km = 0
return (kg, km)
def prepCalc(start, end, mode, fuel, passengers):
print("From {} to {} with {} ({}) and {} people: ".format(start, end, mode, fuel, passengers), end='')
try:
emissions, km = runTest(start, end, mode, fuel)
except:
emissions = km = errorValue
emissions = round(emissions / passengers)
print("{} kg from travelling {} km".format(emissions, km))
return (emissions, km)
#######################################################################
# main startup
#######################################################################
# prepare a file to save results in and also to skip calculations if done
outHeader = [ "Event", "Name", "From", "To", "Mode", "Fuel", "People", "CO2", "Kilometers" ]
# sigh, ensure it exists, since python can't open it for appending otherwise
open(resultsFile, 'a').close()
with open(resultsFile, 'r') as outFile:
resultStr = outFile.read()
rc = 0
with open(inputCSV, newline='') as inFile, open(resultsFile, 'a', newline='') as outFile:
reader = csv.DictReader(inFile, header)
writer = csv.DictWriter(outFile, fieldnames=outHeader)
if not resultStr:
writer.writeheader()
rows = 0
total = 0
kms = 0
for row in reader:
if rows == 0:
# skip bad header
rows = 1
continue
# skip non-matching events
if event and row["Event"] != event:
continue
# is the result already calculated?
mentions = resultStr.count(row["Name"])
if mentions == int(row["Legs"]):
continue
elif mentions > 0 and mentions <= int(row["Legs"]):
print("ERROR: partial results detected for {}, bailing out.".format(row["Name"]))
print("Perhaps there are several input entries?")
rc = 1
break
rows = rows + 1
emissions, km = parseEntry(row, writer)
print("Emissions {} kg from {} travelling {} km".format(emissions, row["Name"], km))
outFile.flush()
total = total + emissions
kms = kms + km
fakeWait(1) # just to be nice to the server
# break
print("Total emissions: {} kg from {}+ people travelling {} km".format(total, rows - 1, kms))
if quitter:
browser.quit()
sys.exit(rc)