from typing import Iterator, TypeVar
from wpiutil.log import DataLogReader, DataLogRecord
from pykit.logreplaysource import LogReplaySource
from pykit.logtable import LogTable
from pykit.logvalue import LogValue
from pykit.wpilog import wpilogconstants
T = TypeVar("T")
[docs]
def safeNext(val: Iterator[T]) -> None | T:
"""
Safely gets the next item from an iterator, returning None if the iterator is exhausted.
:param val: The iterator.
:return: The next item or None.
"""
try:
return next(val)
except StopIteration:
return None
[docs]
class WPILOGReader(LogReplaySource):
"""
Reads a `.wpilog` file and provides the data as a replay source for the logger.
"""
timestamp: int | None
def __init__(self, filename: str) -> None:
"""
Initializes the WPILOGReader.
:param filename: The path to the `.wpilog` file.
"""
self.filename = filename
# Predeclare records to satisfy typing before start() initializes it
self.records: Iterator[DataLogRecord] = iter(())
[docs]
def start(self) -> None:
"""
Initializes the reader by opening the log file and preparing to read records.
"""
self.reader = DataLogReader(self.filename)
self.isValid = (
self.reader.isValid()
and self.reader.getExtraHeader() == wpilogconstants.extraHeader
)
self.records = iter(())
if self.isValid:
# Create a new iterator for the initial entry scan
self.records = iter(self.reader)
self.entryIds: dict[int, str] = {}
self.entryTypes: dict[int, LogValue.LoggableType] = {}
self.timestamp = None
self.entryCustomTypes: dict[int, str] = {}
else:
print(
"[WPILogReader] invalid data log!\n"
+ "WPILogReader MUST use a WPILog generated with a WPILOGWriter"
)
[docs]
def updateTable(self, table: LogTable) -> bool:
"""
Updates a LogTable with the next record from the log file.
This method iterates through the log records, populating the provided
`LogTable` with data corresponding to a single timestamp.
:param table: The `LogTable` to update.
:return: True if the table was updated and there may be more data,
False if the end of the log was reached.
"""
if not self.isValid:
return False
if self.timestamp is not None:
table.setTimestamp(self.timestamp)
keepLogging = False
while (record := safeNext(self.records)) is not None:
if record.isControl():
if record.isStart():
startData = record.getStartData()
self.entryIds[startData.entry] = startData.name
typeStr = startData.type
self.entryTypes[startData.entry] = (
LogValue.LoggableType.fromWPILOGType(typeStr)
)
if typeStr.startswith("struct:") or typeStr == "structschema":
self.entryCustomTypes[startData.entry] = typeStr
else:
entry = self.entryIds.get(record.getEntry())
if entry is not None:
if entry == self.timestampKey:
firsttimestamp = self.timestamp is None
self.timestamp = record.getInteger()
if firsttimestamp:
assert self.timestamp is not None
table.setTimestamp(self.timestamp)
else:
keepLogging = True # we still have a timestamp, just need to wait until next iter
break
elif (
self.timestamp is not None
and record.getTimestamp() == self.timestamp
):
entry = entry[1:]
if entry.startswith("ReplayOutputs"):
continue
customType = self.entryCustomTypes.get(record.getEntry())
entryType = self.entryTypes.get(record.getEntry())
if customType is None:
customType = ""
match entryType:
case LogValue.LoggableType.Raw:
table.putValue(
entry,
LogValue.withType(
entryType, record.getRaw(), customType
),
)
case LogValue.LoggableType.Boolean:
table.putValue(
entry,
LogValue.withType(
entryType, record.getBoolean(), customType
),
)
case LogValue.LoggableType.Integer:
table.putValue(
entry,
LogValue.withType(
entryType, record.getInteger(), customType
),
)
case LogValue.LoggableType.Float:
table.putValue(
entry,
LogValue.withType(
entryType, record.getFloat(), customType
),
)
case LogValue.LoggableType.Double:
table.putValue(
entry,
LogValue.withType(
entryType, record.getDouble(), customType
),
)
case LogValue.LoggableType.String:
table.putValue(
entry,
LogValue.withType(
entryType, record.getString(), customType
),
)
case LogValue.LoggableType.BooleanArray:
table.putValue(
entry,
LogValue.withType(
entryType, record.getBooleanArray(), customType
),
)
case LogValue.LoggableType.IntegerArray:
table.putValue(
entry,
LogValue.withType(
entryType, record.getIntegerArray(), customType
),
)
case LogValue.LoggableType.FloatArray:
table.putValue(
entry,
LogValue.withType(
entryType, record.getFloatArray(), customType
),
)
case LogValue.LoggableType.DoubleArray:
table.putValue(
entry,
LogValue.withType(
entryType, record.getDoubleArray(), customType
),
)
case LogValue.LoggableType.StringArray:
table.putValue(
entry,
LogValue.withType(
entryType, record.getStringArray(), customType
),
)
return keepLogging