from typing import Any, Optional
import sys
import threading
from wpilib import RobotController
from pykit.alertlogger import AlertLogger
from pykit.autolog import AutoLogInputManager, AutoLogOutputManager
from pykit.inputs.loggableds import LoggedDriverStation
from pykit.inputs.loggablepowerdistribution import LoggedPowerDistribution
from pykit.inputs.loggablesystemstats import LoggedSystemStats
from pykit.logdatareciever import LogDataReciever
from pykit.logreplaysource import LogReplaySource
from pykit.logtable import LogTable
from pykit.networktables.loggednetworkinput import LoggedNetworkInput
class _ConsoleRecorder:
def __init__(self, orig):
self.orig = orig
self.lock = threading.Lock()
self.buffer = ""
def write(self, s):
try:
with self.lock:
# always write through to original stream
self.orig.write(s)
try:
self.orig.flush()
except (OSError, ValueError):
# I/O errors or writing to a closed stream
pass
# buffer until newline then record each line
self.buffer += s
while "\n" in self.buffer:
line, self.buffer = self.buffer.split("\n", 1)
try:
# Logger may not yet be initialized when class is defined; reference at runtime
Logger.recordOutput("Console", line)
except (AttributeError, RuntimeError, ValueError):
# Logger may not be ready or the logging backend raised an error
pass
except (OSError, ValueError, RuntimeError):
# Locking errors, I/O errors, or value errors from stream operations
pass
def flush(self):
if self.buffer:
Logger.recordOutput("Console", self.buffer)
self.buffer = ""
try:
self.orig.flush()
except (OSError, ValueError):
# I/O errors or writing to a closed stream
pass
[docs]
class Logger:
"""Manages the logging and replay of data for the robot."""
replaySource: Optional[LogReplaySource] = None
running: bool = False
cycleCount: int = 0
entry: LogTable = LogTable(0)
outputTable: LogTable = LogTable(0)
metadata: dict[str, str] = {}
checkConsole: bool = True
# Internal fields for console capturing
_orig_stdout: Optional[Any] = None
_orig_stderr: Optional[Any] = None
_console_wrapped: bool = False
_console_recorder_stdout: Optional[Any] = None
_console_recorder_stderr: Optional[Any] = None
dataRecievers: list[LogDataReciever] = []
dashboardInputs: list[LoggedNetworkInput] = []
[docs]
@classmethod
def setReplaySource(cls, replaySource: LogReplaySource):
"""
Sets the replay source for the logger.
:param replaySource: The `LogReplaySource` to use for replaying data.
"""
cls.replaySource = replaySource
[docs]
@classmethod
def isReplay(cls) -> bool:
"""
Checks if the logger is currently in replay mode.
:return: True if in replay mode, False otherwise.
"""
return cls.replaySource is not None
[docs]
@classmethod
def recordOutput(cls, key: str, value: Any, unit: Optional[str] = None):
"""
Records an output value to the log table.
This is only active when not in replay mode.
:param key: The key under which to record the value.
:param value: The value to record.
"""
if cls.running:
cls.outputTable.put(key, value, unit=unit)
[docs]
@classmethod
def addDataReciever(cls, reciever: LogDataReciever):
"""
Adds a data receiver to the logger.
:param reciever: The `LogDataReciever` to add.
"""
cls.dataRecievers.append(reciever)
[docs]
@classmethod
def start(cls):
"""
Starts the logger. This initializes logging or replay and sets up the necessary tables.
"""
if not cls.running:
cls.running = True
cls.cycleCount = 0
print("Logger started")
if cls.isReplay():
rs = cls.replaySource
if rs is not None:
rs.start()
if not cls.isReplay():
print("Logger in normal logging mode")
cls.outputTable = cls.entry.getSubTable("RealOutputs")
else:
print("Logger in replay mode")
cls.outputTable = cls.entry.getSubTable("ReplayOutputs")
metadataTable = cls.entry.getSubTable(
"ReplayMetadata" if cls.isReplay() else "RealMetadata"
)
for key, value in cls.metadata.items():
metadataTable.put(key, value)
# Setup console capture to record prints under "Console"
if cls.checkConsole and not cls._console_wrapped:
try:
cls._orig_stdout = sys.stdout
cls._orig_stderr = sys.stderr
cls._console_recorder_stdout = _ConsoleRecorder(cls._orig_stdout)
cls._console_recorder_stderr = _ConsoleRecorder(cls._orig_stderr)
sys.stdout = cls._console_recorder_stdout
sys.stderr = cls._console_recorder_stderr
cls._console_wrapped = True
except (AttributeError, RuntimeError, TypeError):
# If sys streams are missing or recorder construction failed
pass
RobotController.setTimeSource(cls.getTimestamp)
cls.periodicBeforeUser()
[docs]
@classmethod
def startReciever(cls):
"""Starts all registered data receivers."""
for reciever in cls.dataRecievers:
reciever.start()
[docs]
@classmethod
def end(cls):
"""Stops the logger and all data receivers, and performs necessary cleanup."""
if cls.running:
cls.running = False
print("Logger ended")
# Restore console if we wrapped it
if cls._console_wrapped:
try:
if cls._orig_stdout is not None:
sys.stdout = cls._orig_stdout
if cls._orig_stderr is not None:
sys.stderr = cls._orig_stderr
except (AttributeError, RuntimeError):
# Restoring original streams failed
pass
cls._console_wrapped = False
cls._console_recorder_stdout = None
cls._console_recorder_stderr = None
cls._orig_stdout = None
cls._orig_stderr = None
if cls.isReplay():
rs = cls.replaySource
if rs is not None:
rs.end()
RobotController.setTimeSource(RobotController.getFPGATime)
for reciever in cls.dataRecievers:
reciever.end()
[docs]
@classmethod
def getTimestamp(cls) -> int:
"""
Returns the current timestamp for logging.
In replay mode, it gets the timestamp from the log entry.
In normal mode, it gets the current FPGA timestamp.
:return: The current timestamp in microseconds.
"""
if cls.isReplay():
return cls.entry.getTimestamp()
# RobotController.getFPGATime may be untyped; ensure int
return int(RobotController.getFPGATime())
[docs]
@classmethod
def periodicBeforeUser(cls):
"""
Called periodically before the user's robot code.
This method updates the log table with new data, either from the replay source
or from the live robot hardware.
"""
cls.cycleCount += 1
if cls.running:
entryUpdateStart = RobotController.getFPGATime()
if not cls.isReplay():
# Normal mode: set current timestamp
cls.entry.setTimestamp(RobotController.getFPGATime())
else:
# Replay mode: load next timestamped data from log
rs = cls.replaySource
if rs is None or not rs.updateTable(cls.entry):
print("End of replay reached")
if cls.cycleCount == 1:
print(
"[ERROR] This robot did not start properly, is the replay logfile from PyKit?"
)
else:
cls.end()
raise SystemExit(0)
dsStart = RobotController.getFPGATime()
# In replay mode, simulate driver station inputs from log
if cls.isReplay():
LoggedDriverStation.loadFromTable(
cls.entry.getSubTable("DriverStation")
)
dashboardInputStart = RobotController.getFPGATime()
# Update dashboard inputs (choosers, etc.)
for dashInput in cls.dashboardInputs:
dashInput.periodic()
dashboardInputEnd = RobotController.getFPGATime()
cls.recordOutput(
"Logger/EntryUpdateMS", (dsStart - entryUpdateStart) / 1000.0
)
if cls.isReplay():
cls.recordOutput(
"Logger/DriverStationMS", (dashboardInputStart - dsStart) / 1000.0
)
cls.recordOutput(
"Logger/DashboardInputsMS",
(dashboardInputEnd - dashboardInputStart) / 1000.0,
)
[docs]
@classmethod
def periodicAfterUser(cls, userCodeLength: int, periodicBeforeLength: int):
"""
Called periodically after the user's robot code.
This method finalizes the log entry for the current cycle by recording outputs,
performance data, and then sends the log table to all registered receivers.
:param userCodeLength: The execution time of the user's code in microseconds.
:param periodicBeforeLength: The execution time of the `periodicBeforeUser` method in microseconds.
"""
if cls.running:
dsStart = RobotController.getFPGATime()
# In normal mode, save driver station state to log
if not cls.isReplay():
LoggedDriverStation.saveToTable(cls.entry.getSubTable("DriverStation"))
systemStart = RobotController.getFPGATime()
if not cls.isReplay():
LoggedSystemStats.saveToTable(cls.entry.getSubTable("SystemStats"))
LoggedPowerDistribution.getInstance().saveToTable(
cls.entry.getSubTable("PowerDistribution")
)
autoLogStart = RobotController.getFPGATime()
# Publish all auto-logged outputs
AutoLogOutputManager.publish_all(cls.outputTable)
alertLogStart = RobotController.getFPGATime()
AlertLogger.periodic(cls.outputTable)
alertLogEnd = RobotController.getFPGATime()
if not cls.isReplay():
cls.recordOutput(
"Logger/DriverStationMS", (systemStart - dsStart) / 1000.0
)
cls.recordOutput(
"Logger/SystemStatsMS", (autoLogStart - systemStart) / 1000.0
)
# Log all auto-logged inputs
for logged_input in AutoLogInputManager.getInputs():
logged_input.toLog(
cls.entry.getSubTable("/"),
"/" + logged_input.__class__.__name__,
)
cls.recordOutput(
"Logger/AutoLogOutputMS", (alertLogStart - autoLogStart) / 1000.0
)
cls.recordOutput(
"Logger/AlertLoggerMS", (alertLogEnd - alertLogStart) / 1000.0
)
cls.recordOutput("LoggedRobot/UserCodeMS", userCodeLength / 1000.0)
periodicAfterLength = alertLogEnd - dsStart
cls.recordOutput(
"LoggedRobot/LogPeriodicMS",
(periodicBeforeLength + periodicAfterLength) / 1000.0,
)
cls.recordOutput(
"LoggedRobot/FullCycleMS",
(periodicBeforeLength + userCodeLength + periodicAfterLength) / 1000.0,
)
# Send log table to all receivers (file writer, NetworkTables, etc.)
for reciever in cls.dataRecievers:
reciever.putTable(LogTable.clone(cls.entry))