API Documentation
Communication module. Provides generic abstraction for communication and commonly used concrete implementations.
- class com_interface.ComInterface
Bases:
ABCGeneric form of a communication interface to separate communication logic from the underlying interface.
- abstract close(args: Any = 0) None
Closes the ComIF and releases any held resources (for example a Communication Port).
- Returns:
- abstract initialize(args: Any = 0) Any
Perform initializations step which can not be done in constructor or which require returnvalues.
- abstract is_open() bool
Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking
- abstract open(args: Any = 0) None
Opens the communication interface to allow communication.
- Returns:
- abstract packets_available(parameters: Any = 0) int
Poll whether packets are available.
- Parameters:
parameters – Can be an arbitrary parameter.
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.
- Returns:
Number of packets available.
- abstract receive(parameters: Any = 0) list[bytes]
Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.
- Parameters:
parameters
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.
- Returns:
- exception com_interface.ReceptionDecodeError(msg: str, custom_exception: None | Exception)
Bases:
ExceptionGeneric decode error which can also wrap the exception thrown by other libraries.
- exception com_interface.SendError(msg: str, custom_exception: None | Exception)
Bases:
ExceptionGeneric send error which can also wrap the exception thrown by other libraries.
UDP and TCP
UDP Communication Interface
- class com_interface.udp.UdpClient(com_if_id: str, send_address: EthAddr, recv_addr: None | EthAddr = None)
Bases:
ComInterfaceCommunication interface for UDP communication
Initialize a communication interface to send and receive UDP datagrams.
- Parameters:
send_address
recv_addr
- close(args: any | None = None) None
Closes the ComIF and releases any held resources (for example a Communication Port).
- Returns:
- initialize(args: Any | None = None) Any
Perform initializations step which can not be done in constructor or which require returnvalues.
- is_open() bool
Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking
- open(args: Any | None = None) None
Opens the communication interface to allow communication.
- Returns:
- packets_available(parameters: Any = 0) bool
Poll whether packets are available.
- Parameters:
parameters – Can be an arbitrary parameter.
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.
- Returns:
Number of packets available.
- receive(parameter: Any = 0) list[bytes]
Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.
- Parameters:
parameters
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.
- Returns:
TCP communication interface
- class com_interface.tcp.TcpCommunicationType(value)
Bases:
EnumParse for space packets in the TCP stream, using the space packet header.
- SPACE_PACKETS = 0
- class com_interface.tcp.TcpSpacepacketsClient(com_if_id: str, space_packet_ids: Sequence[PacketId], inner_thread_delay: float, target_address: EthAddr, max_packets_stored: int | None = None)
Bases:
ComInterfaceCommunication interface for TCP communication. This particular interface expects raw space packets to be sent via TCP and uses a list of passed packet IDs to parse for them.
Initialize a communication interface to send and receive TMTC via TCP.
- Parameters:
com_if_id
space_packet_ids – Valid packet IDs for CCSDS space packets. Those will be used to parse for space packets inside the TCP stream.
inner_thread_delay – Polling frequency of TCP thread in seconds.
- close(args: Any | None = None) None
Closes the ComIF and releases any held resources (for example a Communication Port).
- Returns:
- initialize(args: Any | None = None) None
Perform initializations step which can not be done in constructor or which require returnvalues.
- is_open() bool
Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking
- open(args: Any | None = None) None
Opens the communication interface to allow communication.
- Returns:
- packets_available(parameters: Any = 0) int
Poll whether packets are available.
- Parameters:
parameters – Can be an arbitrary parameter.
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.
- Returns:
Number of packets available.
- receive(parameters: float = 0) list[bytes]
Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.
- Parameters:
parameters
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.
- Returns:
Serial
- class com_interface.serial_cobs.SerialCobsComIF(ser_cfg: SerialCfg)
Bases:
SerialComBase,ComInterfaceSerial communication interface which uses the COBS protocol to encode and decode packets.
This class will spin up a receiver thread on the
open()call to poll for COBS encoded packets. It decodes all received COBS frames usingcobs.cobs.decode(). This means that theclose()call might block until the receiver thread has shut down.- close(args: Any | None = None) None
Closes the ComIF and releases any held resources (for example a Communication Port).
- Returns:
- encode_data(data: bytes | bytearray) bytearray
Encodes the data using the COBS protocol. :param data: Data to encode. :return: Encoded data.
- initialize(args: Any | None = None) None
Perform initializations step which can not be done in constructor or which require returnvalues.
- is_open() bool
Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking
- open(args: Any | None = None) None
Spins up a receiver thread to permanently check for new COBS encoded packets.
- packets_available(parameters: Any = 0) int
Poll whether packets are available.
- Parameters:
parameters – Can be an arbitrary parameter.
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.
- Returns:
Number of packets available.
- receive(parameters: Any = 0) list[bytes]
Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.
- Parameters:
parameters
- Raises:
ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.
- Returns:
- send(data: bytes | bytearray) None
This function encodes all data using the
cobs.cobs.encode()function.