import os
import random
import datetime
from tempfile import gettempdir
from typing import TYPE_CHECKING
from hal import MatchType
from wpilib import RobotBase, RobotController
from wpiutil import DataLogWriter
from pykit.logdatareciever import LogDataReciever
from pykit.logger import Logger
from pykit.logtable import LogTable
from pykit.logvalue import LogValue
from pykit.wpilog import wpilogconstants
if TYPE_CHECKING:
from wpiutil.log import DataLog
ASCOPE_FILENAME = "ascope-log-path.txt"
[docs]
class WPILOGWriter(LogDataReciever):
"""
A data receiver that writes log data to a `.wpilog` file.
This class handles the creation and writing of log files in the standard
WPILib format, including automatic file naming and handling of data types.
"""
log: "DataLog"
defaultPathRio: str = "/U/logs"
defaultPathSim: str = "pyLogs"
folder: str
filename: str
randomIdentifier: str
dsAttachedTime: int = 0
autoRename: bool
logDate: datetime.datetime | None
logMatchText: str
isOpen: bool = False
lastTable: LogTable
timestampId: int
entryIds: dict[str, int]
entryTypes: dict[str, LogValue.LoggableType]
entryUnits: dict[str, str]
def __init__(self, filename: str | None = None, path: str | None = None) -> None:
"""
Initializes the WPILOGWriter.
:param filename: The path to the `.wpilog` file. If None, a default path is used,
and the file is named with a random identifier.
:param path: The directory to save the log file. If None, a default path is used based
on whether it's running in simulation or on the robot.
in the event that both a filename and a path are provided, the combination of the path and
the filename will be used in determining the location of where to put the log file
"""
actualPath = (
(self.defaultPathSim if RobotBase.isSimulation() else self.defaultPathRio)
if path is None
else path
)
self.randomIdentifier = f"{random.randint(0, 0xFFFF):04X}"
if path is None:
self.folder = os.path.abspath(
os.path.dirname(filename) if filename is not None else actualPath
)
else:
# need to combine if both are specified
self.folder = os.path.abspath(
os.path.join(actualPath, os.path.dirname(filename))
if filename is not None
else actualPath
)
self.filename = (
os.path.basename(filename)
if filename is not None
else f"pykit_{self.randomIdentifier}.wpilog"
)
self.autoRename = filename is None
[docs]
def start(self) -> None:
"""
Initializes the writer by creating the log file and preparing to write data.
"""
# Create folder if necessary
if not os.path.exists(self.folder):
try:
os.makedirs(self.folder)
except PermissionError as e:
print(f"[WPILogWriter] Failed to create log folder! ({e})")
return
# Initialize the WPILOG file
fullPath = os.path.join(self.folder, self.filename)
print(f"[WPILogWriter] Creating WPILOG file at {fullPath}")
if os.path.exists(fullPath):
print("[WPILogWriter] File exists, overwriting")
os.remove(fullPath)
try:
self.log = DataLogWriter(fullPath, wpilogconstants.extraHeader)
except PermissionError as e:
print(f"[WPILogWriter] Failed to open WPILOG file! ({e})")
return
self.isOpen = True
self.timestampId = self.log.start(
self.timestampKey,
LogValue.LoggableType.Integer.getWPILOGType(),
wpilogconstants.entryMetadata,
0,
)
self.lastTable = LogTable(0)
self.entryIds: dict[str, int] = {}
self.entryTypes: dict[str, LogValue.LoggableType] = {}
self.entryUnits: dict[str, str] = {}
self.logDate = None
self.logMatchText = ""
[docs]
def end(self) -> None:
"""
Closes the log file and performs cleanup.
In simulation, it can also trigger AdvantageScope to open the log.
"""
print("[WPILogWriter] Shutting down")
self.log.flush()
self.log.stop()
if RobotBase.isSimulation() and Logger.isReplay():
# open ascope
fullpath = os.path.join(gettempdir(), ASCOPE_FILENAME)
if not os.path.exists(gettempdir()):
return
fullLogPath = os.path.abspath(os.path.join(self.folder, self.filename))
print(f"Sending {fullLogPath} to AScope")
with open(fullpath, "w", encoding="utf-8") as f:
f.write(fullLogPath)
# DataLogManager.stop()
[docs]
def putTable(self, table: LogTable) -> None:
"""
Writes a `LogTable` to the `.wpilog` file.
This method handles automatic file renaming, writing timestamp and data entries,
and ensures that data is only written when it changes.
:param table: The `LogTable` to write.
"""
if not self.isOpen:
return
if self.autoRename:
# Auto-rename log file based on timestamp and match info
if self.logDate is None:
if (
table.get("DriverStation/DSAttached", False)
and table.get("SystemStats/SystemTimeValid", False)
) or RobotBase.isSimulation():
if self.dsAttachedTime == 0:
self.dsAttachedTime = RobotController.getFPGATime() / 1e6
elif (
RobotController.getFPGATime() / 1e6 - self.dsAttachedTime
) > 5 or RobotBase.isSimulation():
self.logDate = datetime.datetime.now()
else:
self.dsAttachedTime = 0
matchType: MatchType
match table.get("DriverStation/MatchType", 0):
case 1:
matchType = MatchType.practice
case 2:
matchType = MatchType.qualification
case 3:
matchType = MatchType.elimination
case _:
matchType = MatchType.none
# Build match text prefix (p/q/e + match number)
if self.logMatchText == "" and matchType != MatchType.none:
match matchType:
case MatchType.practice:
self.logMatchText = "p"
case MatchType.qualification:
self.logMatchText = "q"
case MatchType.elimination:
self.logMatchText = "e"
case _:
self.logMatchText = "u"
self.logMatchText += str(table.get("DriverStation/MatchNumber", 0))
# Generate new filename with timestamp, event, and match info
filename = "pykit_"
if self.logDate is not None:
filename += self.logDate.strftime("%Y%m%d_%H%M%S")
else:
filename += self.randomIdentifier
eventName = (
table.get("DriverStation/EventName", "").lower().replace(" ", "_")
)
if eventName != "":
filename += f"_{eventName}"
if self.logMatchText != "":
filename += f"_{self.logMatchText}"
filename += ".wpilog"
if self.filename != filename:
# Rename log file by closing current and opening new
print(f"[WPILogWriter] Renaming log to {filename}")
fullPath = os.path.join(self.folder, self.filename)
os.rename(fullPath, os.path.join(self.folder, filename))
self.filename = filename
# Write timestamp entry
self.log.appendInteger(
self.timestampId, table.getTimestamp(), table.getTimestamp()
)
# Get current and previous data for change detection
newMap = table.getAll()
oldMap = self.lastTable.getAll()
# Write changed entries to log
for key, newValue in newMap.items():
fieldType = newValue.log_type
fieldUnit = newValue.unit
appendData = False
# Register new field or detect changes
if key not in self.entryIds:
# New field - create entry in log
entryId = self.log.start(
key,
newValue.getWPILOGType(),
(
wpilogconstants.entryMetadata
if fieldUnit is None
else wpilogconstants.entryMetadataUnits.replace(
"$UNITSTR", fieldUnit
)
),
table.getTimestamp(),
)
self.entryIds[key] = entryId
self.entryTypes[key] = newValue.log_type
if fieldUnit is not None:
self.entryUnits[key] = fieldUnit
appendData = True
elif newValue != oldMap.get(key):
# Existing field changed - log new value
appendData = True
# Detect and warn about type changes
elif newValue.log_type != self.entryTypes[key]:
print(
f"[WPILOGWriter] Type of {key} changed from "
f"{self.entryTypes[key]} to {newValue.log_type}, skipping log"
)
continue
if appendData:
entryId = self.entryIds[key]
# check if unit changed
if fieldUnit is not None and self.entryUnits.get(key) != fieldUnit:
self.log.setMetadata(
entryId,
wpilogconstants.entryMetadataUnits.replace(
"$UNITSTR", fieldUnit
),
table.getTimestamp(),
)
self.entryUnits[key] = fieldUnit
match fieldType:
case LogValue.LoggableType.Raw:
self.log.appendRaw(
entryId, newValue.value, table.getTimestamp()
)
case LogValue.LoggableType.Boolean:
self.log.appendBoolean(
entryId, newValue.value, table.getTimestamp()
)
case LogValue.LoggableType.Integer:
self.log.appendInteger(
entryId, newValue.value, table.getTimestamp()
)
case LogValue.LoggableType.Float:
self.log.appendFloat(
entryId, newValue.value, table.getTimestamp()
)
case LogValue.LoggableType.Double:
self.log.appendDouble(
entryId, newValue.value, table.getTimestamp()
)
case LogValue.LoggableType.String:
self.log.appendString(
entryId, newValue.value, table.getTimestamp()
)
case LogValue.LoggableType.BooleanArray:
self.log.appendBooleanArray(
entryId, newValue.value, table.getTimestamp()
)
case LogValue.LoggableType.IntegerArray:
self.log.appendIntegerArray(
entryId, newValue.value, table.getTimestamp()
)
case LogValue.LoggableType.FloatArray:
self.log.appendFloatArray(
entryId, newValue.value, table.getTimestamp()
)
case LogValue.LoggableType.DoubleArray:
self.log.appendDoubleArray(
entryId, newValue.value, table.getTimestamp()
)
case LogValue.LoggableType.StringArray:
self.log.appendStringArray(
entryId, newValue.value, table.getTimestamp()
)
self.log.flush()
self.lastTable = table