Fix both broken weather modules (#789)

* Fix both broken weather modules

The Weather.com changed its API again, this fixes that as well as fixing
a bug that caused the weather conditions to always show as "None".

For Weather Underground, the module had been broken for some time due to
the discontinuation of their API. The module has been rewritten to use
the same API calls that the website itself uses.

* Fix double-click browser launch in wunderground module

* Add example of longer location_code for weather.com
This commit is contained in:
Erik Johnson 2020-08-27 20:49:26 -05:00 committed by GitHub
parent a7c24e43b2
commit d5082fab73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 174 additions and 245 deletions

View File

@ -2,7 +2,7 @@ import json
import re import re
import threading import threading
import time import time
from urllib.request import urlopen from urllib.request import Request, urlopen
from i3pystatus import SettingsBase, IntervalModule, formatp from i3pystatus import SettingsBase, IntervalModule, formatp
from i3pystatus.core.util import user_open, internet, require from i3pystatus.core.util import user_open, internet, require
@ -12,32 +12,37 @@ class WeatherBackend(SettingsBase):
settings = () settings = ()
@require(internet) @require(internet)
def api_request(self, url): def http_request(self, url, headers=None):
req = Request(url, headers=headers or {})
with urlopen(req) as content:
try:
content_type = dict(content.getheaders())['Content-Type']
charset = re.search(r'charset=(.*)', content_type).group(1)
except AttributeError:
charset = 'utf-8'
return content.read().decode(charset)
@require(internet)
def api_request(self, url, headers=None):
self.logger.debug('Making API request to %s', url) self.logger.debug('Making API request to %s', url)
try: try:
with urlopen(url) as content: response_json = self.http_request(url, headers=headers).strip()
try: if not response_json:
content_type = dict(content.getheaders())['Content-Type'] self.logger.debug('JSON response from %s was blank', url)
charset = re.search(r'charset=(.*)', content_type).group(1) return {}
except AttributeError: try:
charset = 'utf-8' response = json.loads(response_json)
response_json = content.read().decode(charset).strip() except json.decoder.JSONDecodeError as exc:
if not response_json: self.logger.error('Error loading JSON: %s', exc)
self.logger.debug('JSON response from %s was blank', url) self.logger.debug('JSON text that failed to load: %s',
return {} response_json)
try: return {}
response = json.loads(response_json) self.logger.log(5, 'API response: %s', response)
except json.decoder.JSONDecodeError as exc: error = self.check_response(response)
self.logger.error('Error loading JSON: %s', exc) if error:
self.logger.debug('JSON text that failed to load: %s', self.logger.error('Error in JSON response: %s', error)
response_json) return {}
return {} return response
self.logger.log(5, 'API response: %s', response)
error = self.check_response(response)
if error:
self.logger.error('Error in JSON response: %s', error)
return {}
return response
except Exception as exc: except Exception as exc:
self.logger.error( self.logger.error(
'Failed to make API request to %s. Exception follows:', url, 'Failed to make API request to %s. Exception follows:', url,
@ -45,8 +50,8 @@ class WeatherBackend(SettingsBase):
) )
return {} return {}
def check_response(response): def check_response(self, response):
raise NotImplementedError return False
class Weather(IntervalModule): class Weather(IntervalModule):
@ -99,21 +104,6 @@ class Weather(IntervalModule):
syntax to conditionally show the value of the **update_error** config value syntax to conditionally show the value of the **update_error** config value
when the backend encounters an error during an update. when the backend encounters an error during an update.
The extended string format syntax also comes in handy for the
:py:mod:`weathercom <.weather.weathercom>` backend, which at a certain
point in the afternoon will have a blank ``{high_temp}`` value. Using the
following snippet in your format string will only display the high
temperature information if it is not blank:
::
{current_temp}{temp_unit}[ Hi: {high_temp}] Lo: {low_temp}[ {update_error}]
Brackets are evaluated from the outside-in, so the fact that the only
formatter in the outer block (``{high_temp}``) is empty would keep the
inner block from being evaluated at all, and entire block would not be
displayed.
See the following links for usage examples for the available weather See the following links for usage examples for the available weather
backends: backends:

View File

@ -7,8 +7,6 @@ from urllib.request import Request, urlopen
from i3pystatus.core.util import internet, require from i3pystatus.core.util import internet, require
from i3pystatus.weather import WeatherBackend from i3pystatus.weather import WeatherBackend
USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0'
class WeathercomHTMLParser(HTMLParser): class WeathercomHTMLParser(HTMLParser):
''' '''
@ -16,6 +14,7 @@ class WeathercomHTMLParser(HTMLParser):
through some other source at runtime and added as <script> elements to the through some other source at runtime and added as <script> elements to the
page source. page source.
''' '''
user_agent = 'Mozilla/5.0 (X11; Linux x86_64; rv:80.0) Gecko/20100101 Firefox/80.0'
def __init__(self, logger): def __init__(self, logger):
self.logger = logger self.logger = logger
@ -24,7 +23,7 @@ class WeathercomHTMLParser(HTMLParser):
def get_weather_data(self, url): def get_weather_data(self, url):
self.logger.debug('Making request to %s to retrieve weather data', url) self.logger.debug('Making request to %s to retrieve weather data', url)
self.weather_data = None self.weather_data = None
req = Request(url, headers={'User-Agent': USER_AGENT}) req = Request(url, headers={'User-Agent': self.user_agent})
with urlopen(req) as content: with urlopen(req) as content:
try: try:
content_type = dict(content.getheaders())['Content-Type'] content_type = dict(content.getheaders())['Content-Type']
@ -98,7 +97,8 @@ class Weathercom(WeatherBackend):
parameter should be set to the location code from weather.com. To obtain parameter should be set to the location code from weather.com. To obtain
this code, search for your location on weather.com, and when you go to the this code, search for your location on weather.com, and when you go to the
forecast page, the code you need will be everything after the last slash in forecast page, the code you need will be everything after the last slash in
the URL (e.g. ``94107:4:US``). the URL (e.g. ``94107:4:US``, or
``8b7867695971473a260df2c5d49ff92dc9079dcb673c545f5f107f5c4ab30732``).
.. _weather-usage-weathercom: .. _weather-usage-weathercom:
@ -154,16 +154,11 @@ class Weathercom(WeatherBackend):
# Setting the locale to en-AU returns units in metric. Leaving it blank # Setting the locale to en-AU returns units in metric. Leaving it blank
# causes weather.com to return the default, which is imperial. # causes weather.com to return the default, which is imperial.
self.locale = 'en-AU' if self.units == 'metric' else '' self.locale = 'en-CA' if self.units == 'metric' else ''
self.forecast_url = self.url_template.format(**vars(self)) self.forecast_url = self.url_template.format(**vars(self))
self.parser = WeathercomHTMLParser(self.logger) self.parser = WeathercomHTMLParser(self.logger)
def check_response(self, response):
# Errors for weather.com API manifest in HTTP error codes, not in the
# JSON response.
return False
@require(internet) @require(internet)
def check_weather(self): def check_weather(self):
''' '''
@ -211,7 +206,7 @@ class Weathercom(WeatherBackend):
return return
try: try:
forecast = self.parser.weather_data['getSunV3DailyForecastUrlConfig'] forecast = self.parser.weather_data['getSunV3CurrentObservationsUrlConfig']
# Same as above, use next(iter(forecast)) to drill down to the # Same as above, use next(iter(forecast)) to drill down to the
# correct nested dict level. # correct nested dict level.
forecast = forecast[next(iter(forecast))]['data'] forecast = forecast[next(iter(forecast))]['data']
@ -255,14 +250,13 @@ class Weathercom(WeatherBackend):
else: else:
pressure_trend = '' pressure_trend = ''
self.logger.critical('forecast = %s', forecast)
try: try:
high_temp = forecast.get('temperatureMax', [])[0] or '' high_temp = forecast.get('temperatureMax24Hour', '')
except (AttributeError, IndexError): except (AttributeError, IndexError):
high_temp = '' high_temp = ''
try: try:
low_temp = forecast.get('temperatureMin', [])[0] low_temp = forecast.get('temperatureMin24Hour', '')
except (AttributeError, IndexError): except (AttributeError, IndexError):
low_temp = '' low_temp = ''

View File

@ -1,55 +1,36 @@
import re
from datetime import datetime
from urllib.request import Request, urlopen
from i3pystatus.core.util import internet, require from i3pystatus.core.util import internet, require
from i3pystatus.weather import WeatherBackend from i3pystatus.weather import WeatherBackend
from datetime import datetime
from urllib.request import urlopen
GEOLOOKUP_URL = 'http://api.wunderground.com/api/%s/geolookup%s/q/%s.json'
STATION_QUERY_URL = 'http://api.wunderground.com/api/%s/%s/q/%s.json'
class Wunderground(WeatherBackend): class Wunderground(WeatherBackend):
''' '''
This module retrieves weather data using the Weather Underground API. This module retrieves weather data from Weather Underground.
.. note:: .. note::
A Weather Underground API key is required to use this module, you can Previous versions of this module required an API key to work. Weather
sign up for a developer API key free at Underground has since discontinued their API, and this module has been
https://www.wunderground.com/weather/api/ rewritten to reflect that.
Valid values for ``location_code`` include: .. rubric:: Finding your weather station
* **State/City_Name** - CA/San_Francisco To use this module, you must provide a weather station code (as the
* **Country/City** - France/Paris ``location_code`` option). To find your weather station, first search for
* **Geolocation by IP** - autoip your city and click to view the current conditions. Below the city name you
* **Zip or Postal Code** - 60616 will see the station name, and to the right of that a ``CHANGE`` link.
* **ICAO Airport Code** - icao:LAX Clicking that link will display a map, where you can find the station
* **Latitude/Longitude** - 41.8301943,-87.6342619 closest to you. Clicking on that station will take you back to the current
* **Personal Weather Station (PWS)** - pws:KILCHICA30 conditions page. The weather station code will now be the last part of the
URL. For example:
When not using a ``pws`` or ``icao`` station ID, the location will be .. code-block:: text
queried (this uses an API query), and the closest station will be used.
For a list of PWS station IDs, visit the following URL:
http://www.wunderground.com/weatherstation/ListStations.asp https://www.wunderground.com/weather/us/ma/cambridge/KMACAMBR4
.. rubric:: API usage In this case, the weather station code would be ``KMACAMBR4``.
An API key is allowed 500 queries per day, and no more than 10 in a
given minute. Therefore, it is recommended to be conservative when
setting the update interval (the default is 1800 seconds, or 30
minutes), and one should be careful how often one restarts i3pystatus
and how often a refresh is forced by left-clicking the module.
As noted above, when not using a ``pws`` or ``icao`` station ID, an API
query will be used to determine the station ID to use. This will be
done once when i3pystatus is started, and not repeated until the next
time i3pystatus is started.
When updating weather data, one API query will be used to obtain the
current conditions. The high/low temperature forecast requires an
additonal API query, and is optional (disabled by default). To enable
forecast checking, set ``forecast=True``.
.. _weather-usage-wunderground: .. _weather-usage-wunderground:
@ -68,10 +49,8 @@ class Wunderground(WeatherBackend):
colorize=True, colorize=True,
hints={'markup': 'pango'}, hints={'markup': 'pango'},
backend=wunderground.Wunderground( backend=wunderground.Wunderground(
api_key='api_key_goes_here', location_code='KMACAMBR4',
location_code='pws:MAT645',
units='imperial', units='imperial',
forecast=True,
update_error='<span color="#ff0000">!</span>', update_error='<span color="#ff0000">!</span>',
), ),
) )
@ -82,204 +61,170 @@ class Wunderground(WeatherBackend):
used. used.
''' '''
settings = ( settings = (
('api_key', 'Weather Underground API key'),
('location_code', 'Location code from wunderground.com'), ('location_code', 'Location code from wunderground.com'),
('units', '\'metric\' or \'imperial\''), ('units', '\'metric\' or \'imperial\''),
('use_pws', 'Set to False to use only airport stations'),
('forecast', 'Set to ``True`` to check forecast (generates one '
'additional API request per weather update). If set to '
'``False``, then the ``low_temp`` and ``high_temp`` '
'formatters will be set to empty strings.'),
('update_error', 'Value for the ``{update_error}`` formatter when an ' ('update_error', 'Value for the ``{update_error}`` formatter when an '
'error is encountered while checking weather data'), 'error is encountered while checking weather data'),
) )
required = ('api_key', 'location_code') required = ('location_code',)
api_key = None
location_code = None location_code = None
units = 'metric' units = 'metric'
use_pws = True
forecast = False
update_error = '!' update_error = '!'
# These will be set once weather data has been checked url_template = 'https://www.wunderground.com/dashboard/pws/{location_code}'
station_id = None
# This will be set in the init based on the passed location code
forecast_url = None forecast_url = None
summary_url = 'https://api.weather.com/v2/pws/dailysummary/1day?apiKey={api_key}&stationId={location_code}&format=json&units={units_type}'
observation_url = 'https://api.weather.com/v2/pws/observations/current?apiKey={api_key}&stationId={location_code}&format=json&units={units_type}'
overview_url = 'https://api.weather.com/v3/aggcommon/v3alertsHeadlines;v3-wx-observations-current;v3-location-point?apiKey={api_key}&geocodes={lat:.2f}%2C{lon:.2f}&language=en-US&units=e&format=json'
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:80.0) Gecko/20100101 Firefox/80.0',
'Referer': 'https://www.wunderground.com/dashboard/pws/{location_code}',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
}
def init(self): def init(self):
''' self.units_type = 'm' if self.units == 'metric' else 'e'
Use the location_code to perform a geolookup and find the closest self.forecast_url = self.url_template.format(**vars(self))
station. If the location is a pws or icao station ID, no lookup will be
peformed.
'''
try:
for no_lookup in ('pws', 'icao'):
sid = self.location_code.partition(no_lookup + ':')[-1]
if sid:
self.station_id = self.location_code
return
except AttributeError:
# Numeric or some other type, either way we'll just stringify
# it below and perform a lookup.
pass
self.get_station_id()
@require(internet) @require(internet)
def get_forecast(self): def get_api_key(self):
''' '''
If configured to do so, make an API request to retrieve the forecast Grab the API key out of the page source from the home page
data for the configured/queried weather station, and return the low and
high temperatures. Otherwise, return two empty strings.
''' '''
no_data = ('', '') url = 'https://www.wunderground.com'
if self.forecast: try:
query_url = STATION_QUERY_URL % (self.api_key, page_source = self.http_request(
'forecast', url,
self.station_id) headers={
'User-Agent': self.headers['User-Agent'],
'Accept-Language': self.headers['Accept-Language'],
'Conncetion': self.headers['Connection'],
},
)
except Exception as exc:
self.logger.exception('Failed to load %s', url)
else:
try: try:
response = self.api_request(query_url)['forecast'] return re.search(r'apiKey=([0-9a-f]+)', page_source).group(1)
response = response['simpleforecast']['forecastday'][0] except AttributeError:
except (KeyError, IndexError, TypeError): self.logger.error('Failed to find API key in mainpage source')
self.logger.error(
'No forecast data found for %s', self.station_id)
self.data['update_error'] = self.update_error
return no_data
unit = 'celsius' if self.units == 'metric' else 'fahrenheit'
low_temp = response.get('low', {}).get(unit, '')
high_temp = response.get('high', {}).get(unit, '')
return low_temp, high_temp
else:
return no_data
@require(internet) @require(internet)
def get_station_id(self): def api_request(self, url, headers=None):
''' if headers is None:
Use geolocation to get the station ID headers = {}
''' return super(Wunderground, self).api_request(
extra_opts = '/pws:0' if not self.use_pws else '' url,
api_url = GEOLOOKUP_URL % (self.api_key, headers=dict([(k, v.format(**vars(self))) for k, v in headers.items()]))
extra_opts,
self.location_code)
response = self.api_request(api_url)
station_type = 'pws' if self.use_pws else 'airport'
try:
stations = response['location']['nearby_weather_stations']
nearest = stations[station_type]['station'][0]
except (KeyError, IndexError):
raise Exception(
'No locations matched location_code %s' % self.location_code)
self.logger.error('nearest = %s', nearest)
if self.use_pws:
nearest_pws = nearest.get('id', '')
if not nearest_pws:
raise Exception('No id entry for nearest PWS')
self.station_id = 'pws:%s' % nearest_pws
else:
nearest_airport = nearest.get('icao', '')
if not nearest_airport:
raise Exception('No icao entry for nearest airport')
self.station_id = 'icao:%s' % nearest_airport
def check_response(self, response):
try:
return response['response']['error']['description']
except KeyError:
# No error in response
return False
@require(internet) @require(internet)
def check_weather(self): def check_weather(self):
''' '''
Query the configured/queried station and return the weather data Query the desired station and return the weather data
''' '''
if self.station_id is None: # Get the API key from the page source
# Failed to get the nearest station ID when first launched, so self.api_key = self.get_api_key()
# retry it. if self.api_key is None:
self.get_station_id() self.data['update_error'] = self.update_error
return
self.data['update_error'] = '' self.data['update_error'] = ''
try: try:
query_url = STATION_QUERY_URL % (self.api_key,
'conditions',
self.station_id)
try: try:
response = self.api_request(query_url)['current_observation'] summary = self.api_request(self.summary_url.format(**vars(self)))['summaries'][0]
self.forecast_url = response.pop('ob_url', None) except (IndexError, KeyError):
except KeyError: self.logger.error(
self.logger.error('No weather data found for %s', self.station_id) 'Failed to retrieve summary data from API response. '
'Run module with debug logging to get more information.'
)
self.data['update_error'] = self.update_error self.data['update_error'] = self.update_error
return return
if self.forecast: try:
query_url = STATION_QUERY_URL % (self.api_key, observation = self.api_request(self.observation_url.format(**vars(self)))['observations'][0]
'forecast', except (IndexError, KeyError):
self.station_id) self.logger.error(
try: 'Failed to retrieve observation data from API response. '
forecast = self.api_request(query_url)['forecast'] 'Run module with debug logging to get more information.'
forecast = forecast['simpleforecast']['forecastday'][0] )
except (KeyError, IndexError, TypeError): self.data['update_error'] = self.update_error
self.logger.error( return
'No forecast data found for %s', self.station_id)
# This is a non-fatal error, so don't return but do set the
# error flag.
self.data['update_error'] = self.update_error
unit = 'celsius' if self.units == 'metric' else 'fahrenheit' self.lat = observation['lat']
low_temp = forecast.get('low', {}).get(unit, '') self.lon = observation['lon']
high_temp = forecast.get('high', {}).get(unit, '')
else: try:
low_temp = high_temp = '' overview = self.api_request(self.overview_url.format(**vars(self)))[0]
except IndexError:
self.logger.error(
'Failed to retrieve overview data from API response. '
'Run module with debug logging to get more information.'
)
self.data['update_error'] = self.update_error
return
if self.units == 'metric': if self.units == 'metric':
temp_unit = 'c' temp_unit = '°C'
speed_unit = 'kph' speed_unit = 'kph'
distance_unit = 'km' distance_unit = 'km'
pressure_unit = 'mb' pressure_unit = 'mb'
else: else:
temp_unit = 'f' temp_unit = '°F'
speed_unit = 'mph' speed_unit = 'mph'
distance_unit = 'mi' distance_unit = 'mi'
pressure_unit = 'in' pressure_unit = 'in'
def _find(key, data=None, default=''):
if data is None:
data = response
return str(data.get(key, default))
try: try:
observation_epoch = _find('observation_epoch') or _find('local_epoch') observation_time_str = observation.get('obsTimeLocal', '')
observation_time = datetime.fromtimestamp(int(observation_epoch)) observation_time = datetime.strptime(observation_time_str,
except (TypeError, ValueError): '%Y-%m-%d %H:%M:%S')
log.debug( except (ValueError, AttributeError):
'Observation time \'%s\' is not a UNIX timestamp',
observation_epoch
)
observation_time = datetime.fromtimestamp(0) observation_time = datetime.fromtimestamp(0)
self.data['city'] = _find('city', response['observation_location']) def _find(path, data, default=''):
self.data['condition'] = _find('weather') ptr = data
try:
for item in path.split(':'):
if item == 'units':
item = self.units
ptr = ptr[item]
except (KeyError, IndexError, TypeError):
return default
return str(ptr)
pressure_tendency = _find(
'v3-wx-observations-current:pressureTendencyTrend',
overview).lower()
pressure_trend = '+' if pressure_tendency == 'rising' else '-'
self.data['city'] = _find('v3-location-point:location:city', overview)
self.data['condition'] = _find('v3-wx-observations-current:wxPhraseShort', overview)
self.data['observation_time'] = observation_time self.data['observation_time'] = observation_time
self.data['current_temp'] = _find('temp_' + temp_unit).split('.')[0] self.data['current_temp'] = _find('units:temp', observation, '0')
self.data['low_temp'] = low_temp self.data['low_temp'] = _find('units:tempLow', summary)
self.data['high_temp'] = high_temp self.data['high_temp'] = _find('units:tempHigh', summary)
self.data['temp_unit'] = '°' + temp_unit.upper() self.data['temp_unit'] = temp_unit
self.data['feelslike'] = _find('feelslike_' + temp_unit) self.data['feelslike'] = _find('units:heatIndex', observation)
self.data['dewpoint'] = _find('dewpoint_' + temp_unit) self.data['dewpoint'] = _find('units:dewpt', observation)
self.data['wind_speed'] = _find('wind_' + speed_unit) self.data['wind_speed'] = _find('units:windSpeed', observation)
self.data['wind_unit'] = speed_unit self.data['wind_unit'] = speed_unit
self.data['wind_direction'] = _find('wind_dir') self.data['wind_direction'] = _find('v3-wx-observations-current:windDirectionCardinal', overview)
self.data['wind_gust'] = _find('wind_gust_' + speed_unit) self.data['wind_gust'] = _find('units:windGust', observation)
self.data['pressure'] = _find('pressure_' + pressure_unit) self.data['pressure'] = _find('units:pressure', observation)
self.data['pressure_unit'] = pressure_unit self.data['pressure_unit'] = pressure_unit
self.data['pressure_trend'] = _find('pressure_trend') self.data['pressure_trend'] = pressure_trend
self.data['visibility'] = _find('visibility_' + distance_unit) self.data['visibility'] = _find('v3-wx-observations-current:visibility', overview)
self.data['visibility_unit'] = distance_unit self.data['visibility_unit'] = distance_unit
self.data['humidity'] = _find('relative_humidity').rstrip('%') self.data['humidity'] = _find('humidity', observation)
self.data['uv_index'] = _find('UV') self.data['uv_index'] = _find('uv', observation)
except Exception: except Exception:
# Don't let an uncaught exception kill the update thread # Don't let an uncaught exception kill the update thread
self.logger.error( self.logger.error(