college/Summer-2024/CS-3443/LakeWatch/LakeWatchScraper/lakewatchscraper/ramps/comalramp.py

35 lines
1.5 KiB
Python

import datetime
from typing import Any, override
from urllib import request
import json
import http.client
from lakewatchscraper.ramps.ramp import Ramp, RampStatus
class ComalRamp(Ramp):
def parse_data(self, data: dict[Any, Any]):
for ramp_info in data["features"]:
ramp_info: dict[str, dict[str, str]]
ramp_attr = ramp_info["attributes"]
if int(ramp_attr["RampNumber"]) == self.ramp_number:
match ramp_attr["Status"]:
case "OPEN":
self.status = RampStatus.OPEN
case "CLOSED":
self.status = RampStatus.CLOSED
case _:
self.status = RampStatus.UNKNOWN
@override
def fetch_data(self):
# TODO: Cache this request
url = "https://cceo.co.comal.tx.us/arcgispor/sharing/servers/bc3ece11d9c343a58cc0e6f7496bc889/rest/services/EditFeature/BoatRampsClosures/MapServer/0/query?f=json&resultOffset=0&resultRecordCount=1000&where=1=1&outFields=RampNumber,Status&returnZ=true&spatialRel=esriSpatialRelIntersects"
with request.urlopen(url) as data:
data: http.client.HTTPResponse
ramp_data_cache: dict[Any, Any] = json.loads(
"\n".join([line.decode("utf-8") for line in data.readlines()])
)
last_updated_time = datetime.datetime.now()
self.parse_data(ramp_data_cache)
self.last_updated = last_updated_time