Source code for pykit.networktables.nt4Publisher
from ntcore import (
GenericPublisher,
IntegerPublisher,
NetworkTable,
NetworkTableInstance,
PubSubOptions,
)
from pykit.logdatareciever import LogDataReciever
from pykit.logtable import LogTable
from pykit.logvalue import LogValue
[docs]
class NT4Publisher(LogDataReciever):
"""
A data receiver that publishes log data to NetworkTables.
This class listens for log table updates and publishes the data
to a specified NetworkTable, allowing for real-time monitoring.
"""
pykitTable: NetworkTable
lastTable: LogTable = LogTable(0)
timestampPublisher: IntegerPublisher
publishers: dict[str, GenericPublisher] = {}
units: dict[str, str] = {}
def __init__(self, actLikeAKit: bool = False):
"""
Initializes the NT4Publisher.
:param actLikeAKit: If True, publishes to the "/AdvantageKit" table.
Otherwise, publishes to the "/PyKit" table.
"""
self.pykitTable = NetworkTableInstance.getDefault().getTable(
"/AdvantageKit" if actLikeAKit else "/PyKit"
)
options = PubSubOptions()
options.sendAll = True
self.timestampPublisher = self.pykitTable.getIntegerTopic(
self.timestampKey[1:]
).publish(options)
[docs]
def putTable(self, table: LogTable):
"""
Publishes the contents of a LogTable to NetworkTables.
This method compares the new table with the last one received and only
publishes the values that have changed.
:param table: The LogTable to publish.
"""
self.timestampPublisher.set(table.getTimestamp(), table.getTimestamp())
# Compare with previous table to only publish changes
newMap = table.getAll(False)
oldMap = self.lastTable.getAll(False)
for key, newValue in newMap.items():
if newValue == oldMap.get(key):
continue
key = key[1:]
unit = newValue.unit
# Create publisher for new topics
publisher = self.publishers.get(key)
if publisher is None:
publisher = self.pykitTable.getTopic(key).genericPublish(
newValue.getNT4Type()
)
self.publishers[key] = publisher
if unit is not None:
self.pykitTable.getTopic(key).setProperty("unit", unit)
self.units[key] = unit
# Update unit if it has changed
if unit is not None and self.units.get(key) != unit:
self.pykitTable.getTopic(key).setProperty("unit", unit)
self.units[key] = unit
if unit is not None:
print(self.pykitTable.getTopic(key).getProperties())
match newValue.log_type:
case LogValue.LoggableType.Raw:
publisher.setRaw(newValue.value, table.getTimestamp())
case LogValue.LoggableType.Boolean:
publisher.setBoolean(newValue.value, table.getTimestamp())
case LogValue.LoggableType.Integer:
publisher.setInteger(newValue.value, table.getTimestamp())
case LogValue.LoggableType.Float:
publisher.setFloat(newValue.value, table.getTimestamp())
case LogValue.LoggableType.Double:
publisher.setDouble(newValue.value, table.getTimestamp())
case LogValue.LoggableType.String:
publisher.setString(newValue.value, table.getTimestamp())
case LogValue.LoggableType.BooleanArray:
publisher.setBooleanArray(newValue.value, table.getTimestamp())
case LogValue.LoggableType.IntegerArray:
publisher.setIntegerArray(newValue.value, table.getTimestamp())
case LogValue.LoggableType.FloatArray:
publisher.setFloatArray(newValue.value, table.getTimestamp())
case LogValue.LoggableType.DoubleArray:
publisher.setDoubleArray(newValue.value, table.getTimestamp())
case LogValue.LoggableType.StringArray:
publisher.setStringArray(newValue.value, table.getTimestamp())