Source code for pykit.logtable

from typing import Any, Optional, Set, cast

from wpiutil import wpistruct
from pykit.logvalue import LogValue


[docs] class LogTable: """ Represents a table of loggable values for a single timestamp. It stores data as key-value pairs where keys are strings and values are `LogValue` objects. """ prefix: str depth: int _timestamp: int data: dict[str, LogValue] def __init__(self, timestamp: int, prefix="/") -> None: """ Initializes a new LogTable. :param timestamp: The timestamp for the log entries in this table, in microseconds. :param prefix: The prefix for all keys in this table. """ self._timestamp = timestamp self.prefix = prefix self.depth = 0 self.data: dict[str, LogValue] = {}
[docs] @staticmethod def clone(source: "LogTable"): """ Creates a shallow copy of a LogTable. :param source: The LogTable to clone. :return: A new LogTable instance with the same data. """ data: dict[str, LogValue] = {} for item, value in source.data.items(): data[item] = value newTable = LogTable(source._timestamp, source.prefix) newTable.data = data return newTable
[docs] def getTimestamp(self) -> int: """ Returns the timestamp of the log table. :return: The timestamp in microseconds. """ return self._timestamp
[docs] def setTimestamp(self, timestamp: int) -> None: """ Sets the timestamp of the log table. :param timestamp: The new timestamp in microseconds. """ self._timestamp = timestamp
[docs] def writeAllowed( self, key: str, logType: LogValue.LoggableType, customType: str, ) -> bool: """ Checks if a write operation is allowed for a given key and type. Prevents changing the type of an existing log entry. :param key: The key of the log entry. :param logType: The `LoggableType` of the new value. :param customType: The custom type string of the new value. :return: True if writing is allowed, False otherwise. """ if (currentVal := self.data.get(self.prefix + key)) is None: return True if currentVal.log_type != logType: print( f"Failed to write {key}:\nAttempted {logType} but type is {currentVal.log_type}" ) return False if customType != currentVal.custom_type: print( f"Failed to write {key}:\nAttempted {customType} but type is {currentVal.custom_type}" ) return False return True
[docs] def addStructSchemaNest(self, structname: str, schema: str): """ Adds the schema for a nested WPILib struct to the log table. :param structname: The name of the struct. :param schema: The schema string of the struct. """ typeString = structname key = "/.schema/" + typeString if key in self.data.keys(): return self.data[key] = LogValue(schema.encode(), "structschema")
[docs] def addStructSchema(self, struct: Any, seen: Set[str]): """ Adds the schema for a WPILib struct and its nested structs to the log table. :param struct: The struct instance. :param seen: A set of already processed type strings to avoid recursion loops. """ # Add struct schema definition to log for replay compatibility typeString = "struct:" + wpistruct.getTypeName(struct.__class__) key = "/.schema/" + typeString if key in self.data.keys(): return seen.add(typeString) schema = wpistruct.getSchema(struct.__class__) self.data[key] = LogValue(schema.encode(), "structschema") # Recursively add schemas for nested struct types wpistruct.forEachNested(struct.__class__, self.addStructSchemaNest) seen.remove(typeString)
[docs] def put(self, key: str, value: Any, typeStr: str = "", unit: Optional[str] = None): """ Puts a value into the log table, automatically handling WPILib structs and arrays. The value is wrapped in a `LogValue` object. :param key: The key for the log entry. :param value: The value to be logged. :param typeStr: An optional custom type string. """ if hasattr(value, "WPIStruct"): # Handle WPILib struct types - serialize and add schema self.addStructSchema(value, set()) log_value = LogValue( wpistruct.pack(value), "struct:" + wpistruct.getTypeName(value.__class__), ) elif ( hasattr(value, "__iter__") and len(value) > 0 and hasattr(value[0], "WPIStruct") ): # Handle arrays of struct types self.addStructSchema(value[0], set()) log_value = LogValue( wpistruct.packArray(value), "struct:" + wpistruct.getTypeName(value[0].__class__) + "[]", ) else: log_value = LogValue(value, typeStr, unit) self.putValue(key, log_value)
[docs] def putValue(self, key: str, log_value: LogValue): """ Puts a `LogValue` object into the log table. :param key: The key for the log entry. :param log_value: The `LogValue` object to be stored. """ # Handle empty array edge case - match type to previous entry to avoid type mismatch if isinstance(log_value.value, list) and len(log_value.value) == 0: currentVal = self.data.get(self.prefix + key) if currentVal is not None: log_value.log_type = currentVal.log_type log_value.custom_type = currentVal.custom_type if currentVal.custom_type.startswith("struct"): # Struct logging uses raw bytes, so empty array needs empty bytes log_value.value = b"" else: # Don't log if no previous entry to match type against return if self.writeAllowed(key, log_value.log_type, log_value.custom_type): self.data[self.prefix + key] = log_value else: print(f"Failed to insert {log_value.value}")
[docs] def get(self, key: str, defaultValue: Any) -> Any: """ Gets a value from the log table. :param key: The key of the value to retrieve. :param defaultValue: The value to return if the key is not found. :return: The retrieved value or the default value. """ if (log_value := self.data.get(self.prefix + key)) is not None: return log_value.value return defaultValue
[docs] def getRaw(self, key: str, defaultValue: bytes) -> bytes: """ Gets a raw (bytes) value from the log table. :param key: The key of the value to retrieve. :param defaultValue: The value to return if the key is not found or the type is incorrect. :return: The retrieved value or the default value. """ if ( log_value := self.data.get(self.prefix + key) ) is not None and log_value.log_type == LogValue.LoggableType.Raw: return cast(bytes, log_value.value) return defaultValue
[docs] def getBoolean(self, key: str, defaultValue: bool) -> bool: """ Gets a boolean value from the log table. :param key: The key of the value to retrieve. :param defaultValue: The value to return if the key is not found or the type is incorrect. :return: The retrieved value or the default value. """ if ( log_value := self.data.get(self.prefix + key) ) is not None and log_value.log_type == LogValue.LoggableType.Boolean: return cast(bool, log_value.value) return defaultValue
[docs] def getInteger(self, key: str, defaultValue: int) -> int: """ Gets an integer value from the log table. :param key: The key of the value to retrieve. :param defaultValue: The value to return if the key is not found or the type is incorrect. :return: The retrieved value or the default value. """ if ( log_value := self.data.get(self.prefix + key) ) is not None and log_value.log_type == LogValue.LoggableType.Integer: return cast(int, log_value.value) return defaultValue
[docs] def getFloat(self, key: str, defaultValue: float) -> float: """ Gets a float value from the log table. :param key: The key of the value to retrieve. :param defaultValue: The value to return if the key is not found or the type is incorrect. :return: The retrieved value or the default value. """ if ( log_value := self.data.get(self.prefix + key) ) is not None and log_value.log_type == LogValue.LoggableType.Float: return cast(float, log_value.value) return defaultValue
[docs] def getDouble(self, key: str, defaultValue: float) -> float: """ Gets a double value from the log table. :param key: The key of the value to retrieve. :param defaultValue: The value to return if the key is not found or the type is incorrect. :return: The retrieved value or the default value. """ if ( log_value := self.data.get(self.prefix + key) ) is not None and log_value.log_type == LogValue.LoggableType.Double: return cast(float, log_value.value) return defaultValue
[docs] def getString(self, key: str, defaultValue: str) -> str: """ Gets a string value from the log table. :param key: The key of the value to retrieve. :param defaultValue: The value to return if the key is not found or the type is incorrect. :return: The retrieved value or the default value. """ if ( log_value := self.data.get(self.prefix + key) ) is not None and log_value.log_type == LogValue.LoggableType.String: return cast(str, log_value.value) return defaultValue
[docs] def getBooleanArray(self, key: str, defaultValue: list[bool]) -> list[bool]: """ Gets a boolean array value from the log table. :param key: The key of the value to retrieve. :param defaultValue: The value to return if the key is not found or the type is incorrect. :return: The retrieved value or the default value. """ if ( log_value := self.data.get(self.prefix + key) ) is not None and log_value.log_type == LogValue.LoggableType.BooleanArray: return cast(list[bool], log_value.value) return defaultValue
[docs] def getIntegerArray(self, key: str, defaultValue: list[int]) -> list[int]: """ Gets an integer array value from the log table. :param key: The key of the value to retrieve. :param defaultValue: The value to return if the key is not found or the type is incorrect. :return: The retrieved value or the default value. """ if ( log_value := self.data.get(self.prefix + key) ) is not None and log_value.log_type == LogValue.LoggableType.IntegerArray: return cast(list[int], log_value.value) return defaultValue
[docs] def getFloatArray(self, key: str, defaultValue: list[float]) -> list[float]: """ Gets a float array value from the log table. :param key: The key of the value to retrieve. :param defaultValue: The value to return if the key is not found or the type is incorrect. :return: The retrieved value or the default value. """ if ( log_value := self.data.get(self.prefix + key) ) is not None and log_value.log_type == LogValue.LoggableType.FloatArray: return cast(list[float], log_value.value) return defaultValue
[docs] def getDoubleArray(self, key: str, defaultValue: list[float]) -> list[float]: """ Gets a double array value from the log table. :param key: The key of the value to retrieve. :param defaultValue: The value to return if the key is not found or the type is incorrect. :return: The retrieved value or the default value. """ if ( log_value := self.data.get(self.prefix + key) ) is not None and log_value.log_type == LogValue.LoggableType.DoubleArray: return cast(list[float], log_value.value) return defaultValue
[docs] def getStringArray(self, key: str, defaultValue: list[str]) -> list[str]: """ Gets a string array value from the log table. :param key: The key of the value to retrieve. :param defaultValue: The value to return if the key is not found or the type is incorrect. :return: The retrieved value or the default value. """ if ( log_value := self.data.get(self.prefix + key) ) is not None and log_value.log_type == LogValue.LoggableType.StringArray: return cast(list[str], log_value.value) return defaultValue
[docs] def getAll(self, subtableOnly: bool = False) -> dict[str, LogValue]: """ Returns all log values in the table. :param subtableOnly: If True, returns only the entries within the current subtable's prefix. :return: A dictionary of all log entries. """ if not subtableOnly: return self.data return { key: value for key, value in self.data.items() if key.startswith(self.prefix) }
[docs] def getSubTable(self, subtablePrefix: str) -> "LogTable": """ Returns a new `LogTable` instance representing a subtable of the current one. The new table shares the same underlying data but has an extended prefix. :param subtablePrefix: The prefix for the subtable. :return: A new `LogTable` for the specified subtable. """ subtable = LogTable(self.getTimestamp(), self.prefix + subtablePrefix + "/") subtable.data = self.data subtable.depth = self.depth + 1 return subtable