const GtfsRealtimeBindings = require("gtfs-realtime-bindings"); const unzipper = require("unzipper"); const parse = require("csv-parse/sync").parse; const TRAIN_ROUTES = ["701", "703", "704", "720", "750"]; const URL = "https://apps.rideuta.com/tms/gtfs/"; const URL_ROUTES = ["Vehicle", "TripUpdate", "Alert"]; const RT_POLLING = 3000; let gtfs_data = null; let gtfs_rt_v = null; let gtfs_rt_t = null; let gtfs_rt_a = null; let gtfs_timestamp = null; let updatePromise = null; async function applyTripUpdates(stopTimes, tripId) { await updateGtfsRt(); if (!gtfs_rt_t) return stopTimes; const tripUpdateEntity = gtfs_rt_t.entity.find(e => e.tripUpdate?.trip?.tripId === tripId); if (!tripUpdateEntity || !tripUpdateEntity.tripUpdate) return stopTimes; const updates = tripUpdateEntity.tripUpdate.stopTimeUpdate || []; const updateMap = new Map(); updates.forEach(u => { if (u.stopId) updateMap.set(u.stopId, u); }); return stopTimes.map(st => { const update = updateMap.get(st.stop_id); if (!update) return { ...st }; return { ...st, arrival: update.arrival ? { time: update.arrival.time, delay: update.arrival.delay } : undefined, departure: update.departure ? { time: update.departure.time, delay: update.departure.delay } : undefined, scheduleRelationship: update.scheduleRelationship || 'SCHEDULED' }; }); } async function loadGtfsStaticInMemory() { const url = "https://apps.rideuta.com/tms/gtfs/Static"; const res = await fetch(url); if (!res.ok) throw new Error(`Failed: ${res.status} ${res.statusText}`); const zipBuffer = Buffer.from(await res.arrayBuffer()); const directory = await unzipper.Open.buffer(zipBuffer); const gtfs = {}; for (const entry of directory.files) { if (!entry.path.endsWith(".txt")) continue; const fileBuffer = await entry.buffer(); const text = fileBuffer.toString("utf8"); const rows = parse(text, { columns: true, skip_empty_lines: true }); const name = entry.path.replace(".txt", ""); gtfs[name] = rows; } return gtfs; } async function getTrains() { await updateGtfsRt(); if (!gtfs_rt_v) return []; const trains = []; gtfs_rt_v.entity.forEach(entity => { if (entity.vehicle) { const tripId = entity.vehicle.trip.tripId; const trip = gtfs_data.trips.find(t => t.trip_id === tripId); //const stopTimes = gtfs_data.stop_times.filter(st => st.trip_id === tripId); let route = null; if (trip) { route = gtfs_data.routes.find(r => r.route_id === trip.route_id); } if (route && TRAIN_ROUTES.find(r => r === route.route_short_name)) { trains.push({ vehicle: entity.vehicle, trip, //stopTimes, route }); } } }); return trains; } async function getTrainsByRoute(route) { const trains = await getTrains(); return trains.filter(t => t.route && t.route.route_short_name === route); } async function getBuses() { await updateGtfsRt(); if (!gtfs_rt_v) return []; const buses = []; gtfs_rt_v.entity.forEach(entity => { if (entity.vehicle) { const tripId = entity.vehicle.trip.tripId; const trip = gtfs_data.trips.find(t => t.trip_id === tripId); //const stopTimes = gtfs_data.stop_times.filter(st => st.trip_id === tripId); let route = null; if (trip) { route = gtfs_data.routes.find(r => r.route_id === trip.route_id); } if (route && !TRAIN_ROUTES.find(r => r === route.route_short_name)) { buses.push({ vehicle: entity.vehicle, trip, //stopTimes, route }); } } }); return buses; } async function getBusesByRoute(route) { const buses = await getBuses(); return buses.filter(b => b.route && b.route.route_short_name === route); } async function updateGtfsRt() { if (!gtfs_data) gtfs_data = await loadGtfsStaticInMemory(); const now = Date.now(); if (!gtfs_timestamp || now - gtfs_timestamp >= RT_POLLING) { if (!updatePromise) { updatePromise = (async () => { gtfs_timestamp = now; try { [gtfs_rt_v, gtfs_rt_t, gtfs_rt_a] = await Promise.all([ loadGtfsRt("Vehicle"), loadGtfsRt("TripUpdate"), loadGtfsRt("Alert") ]); } catch (e) { console.error("Failed to update RT feeds:", e); } finally { updatePromise = null; } })(); } await updatePromise; } return { gtfs_rt_v,gtfs_rt_t,gtfs_rt_a }; } async function loadGtfsRt(feedType = URL_ROUTES[0]) { const response = await fetch(URL + feedType); if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } const buffer = new Uint8Array(await response.arrayBuffer()); return GtfsRealtimeBindings.transit_realtime.FeedMessage.decode(buffer); } async function getAlerts() { await updateGtfsRt(); const alerts = []; if (!gtfs_rt_a) return []; const routeMap = new Map(gtfs_data.routes.map(r => [r.route_id, r])); gtfs_rt_a.entity.forEach(entity => { const informedEntities = entity.alert?.informedEntity || []; const routes = informedEntities .map(e => e.routeId) .filter(Boolean) .map(routeId => routeMap.get(routeId)) .filter(Boolean); alerts.push({ alert: entity.alert, routes }); }); return alerts; } async function getAlertsByRoute(route) { const alerts = await getAlerts(); return alerts.filter(a => a.routes && a.routes.some(r => r.route_short_name === route) ); } async function getScheduleByRoute(route) { await updateGtfsRt(); const matchingRoutes = gtfs_data.routes.filter(r => r.route_short_name === route); if (!matchingRoutes.length) return []; const trips = gtfs_data.trips.filter(t => matchingRoutes.some(r => r.route_id === t.route_id)); const schedulePromises = trips.map(async trip => { let stopTimes = gtfs_data.stop_times.filter(st => st.trip_id === trip.trip_id); stopTimes = await applyTripUpdates(stopTimes, trip.trip_id); const routeObj = matchingRoutes.find(r => r.route_id === trip.route_id); return { trip, stopTimes, route: routeObj }; }); return await Promise.all(schedulePromises); } async function getScheduleByStationId(stopId) { await updateGtfsRt(); const stopTimes = gtfs_data.stop_times.filter(st => st.stop_id === stopId); const schedulePromises = stopTimes.map(async st => { const trip = gtfs_data.trips.find(t => t.trip_id === st.trip_id); const route = trip ? gtfs_data.routes.find(r => r.route_id === trip.route_id) : null; const updatedStopTimes = await applyTripUpdates([st], st.trip_id); return { stopTime: updatedStopTimes[0], trip, route }; }); return await Promise.all(schedulePromises); } async function getShapeByRoute(route) { if (!gtfs_data) { await loadGtfsStaticInMemory(); }; const routes = gtfs_data.routes.filter(r => r.route_short_name === route); if (!routes.length) return []; const shapeIds = new Set(); gtfs_data.trips.forEach(trip => { if (routes.some(r => r.route_id === trip.route_id) && trip.shape_id) { shapeIds.add(trip.shape_id); } }); const shapes = []; shapeIds.forEach(shapeId => { const points = gtfs_data.shapes .filter(s => s.shape_id === shapeId) .sort((a, b) => parseInt(a.shape_pt_sequence) - parseInt(b.shape_pt_sequence)) .map(s => ({ lat: parseFloat(s.shape_pt_lat), lon: parseFloat(s.shape_pt_lon) })); shapes.push({ shapeId, points }); }); return shapes; } async function getRoutes() { if (!gtfs_data) { gtfs_data = await loadGtfsStaticInMemory(); } return gtfs_data.routes || []; } async function getStops() { if (!gtfs_data) { gtfs_data = await loadGtfsStaticInMemory(); } return gtfs_data.stops || []; } async function getStopsByRoute(route) { if (!gtfs_data) { gtfs_data = await loadGtfsStaticInMemory(); } const matchingRoutes = gtfs_data.routes.filter(r => r.route_short_name === route); if (!matchingRoutes.length) return []; const trips = gtfs_data.trips.filter(t => matchingRoutes.some(r => r.route_id === t.route_id)); const stopIds = new Set( gtfs_data.stop_times .filter(st => trips.some(trip => trip.trip_id === st.trip_id)) .map(st => st.stop_id) ); return gtfs_data.stops.filter(s => stopIds.has(s.stop_id)); } setInterval(async () => { console.log("Refreshing static GTFS..."); gtfs_data = await loadGtfsStaticInMemory(); }, 24 * 60 * 60 * 1000); module.exports = { getStops, getStopsByRoute, getRoutes, getTrains, getBuses, getTrainsByRoute, getBusesByRoute, getAlerts, getAlertsByRoute, getScheduleByRoute, getScheduleByStationId, getShapeByRoute, updateGtfsRt, loadGtfsStaticInMemory, }