API Documentation

Communication module. Provides generic abstraction for communication and commonly used concrete implementations.

class com_interface.ComInterface

Bases: ABC

Generic form of a communication interface to separate communication logic from the underlying interface.

abstract close(args: Any = 0) None

Closes the ComIF and releases any held resources (for example a Communication Port).

Returns:

abstract property id: str
abstract initialize(args: Any = 0) Any

Perform initializations step which can not be done in constructor or which require returnvalues.

abstract is_open() bool

Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking

abstract open(args: Any = 0) None

Opens the communication interface to allow communication.

Returns:

abstract packets_available(parameters: Any = 0) int

Poll whether packets are available.

Parameters:

parameters – Can be an arbitrary parameter.

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.

Returns:

Number of packets available.

abstract receive(parameters: Any = 0) list[bytes]

Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.

Parameters:

parameters

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.

Returns:

abstract send(data: bytes | bytearray) None

Send raw data.

Raises:

SendError – Sending failed for some reason.

exception com_interface.ReceptionDecodeError(msg: str, custom_exception: None | Exception)

Bases: Exception

Generic decode error which can also wrap the exception thrown by other libraries.

exception com_interface.SendError(msg: str, custom_exception: None | Exception)

Bases: Exception

Generic send error which can also wrap the exception thrown by other libraries.

UDP and TCP

UDP Communication Interface

class com_interface.udp.UdpClient(com_if_id: str, send_address: EthAddr, recv_addr: None | EthAddr = None)

Bases: ComInterface

Communication interface for UDP communication

Initialize a communication interface to send and receive UDP datagrams.

Parameters:
  • send_address

  • recv_addr

close(args: any | None = None) None

Closes the ComIF and releases any held resources (for example a Communication Port).

Returns:

property id: str
initialize(args: Any | None = None) Any

Perform initializations step which can not be done in constructor or which require returnvalues.

is_open() bool

Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking

open(args: Any | None = None) None

Opens the communication interface to allow communication.

Returns:

packets_available(parameters: Any = 0) bool

Poll whether packets are available.

Parameters:

parameters – Can be an arbitrary parameter.

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.

Returns:

Number of packets available.

receive(parameter: Any = 0) list[bytes]

Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.

Parameters:

parameters

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.

Returns:

send(data: bytes | bytearray) None

Send raw data.

Raises:

SendError – Sending failed for some reason.

TCP communication interface

class com_interface.tcp.TcpCommunicationType(value)

Bases: Enum

Parse for space packets in the TCP stream, using the space packet header.

SPACE_PACKETS = 0
class com_interface.tcp.TcpSpacepacketsClient(com_if_id: str, space_packet_ids: Sequence[PacketId], inner_thread_delay: float, target_address: EthAddr, max_packets_stored: int | None = None)

Bases: ComInterface

Communication interface for TCP communication. This particular interface expects raw space packets to be sent via TCP and uses a list of passed packet IDs to parse for them.

Initialize a communication interface to send and receive TMTC via TCP.

Parameters:
  • com_if_id

  • space_packet_ids – Valid packet IDs for CCSDS space packets. Those will be used to parse for space packets inside the TCP stream.

  • inner_thread_delay – Polling frequency of TCP thread in seconds.

close(args: Any | None = None) None

Closes the ComIF and releases any held resources (for example a Communication Port).

Returns:

property id: str
initialize(args: Any | None = None) None

Perform initializations step which can not be done in constructor or which require returnvalues.

is_open() bool

Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking

open(args: Any | None = None) None

Opens the communication interface to allow communication.

Returns:

packets_available(parameters: Any = 0) int

Poll whether packets are available.

Parameters:

parameters – Can be an arbitrary parameter.

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.

Returns:

Number of packets available.

receive(parameters: float = 0) list[bytes]

Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.

Parameters:

parameters

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.

Returns:

send(data: bytes | bytearray) None

Send raw data.

Raises:

SendError – Sending failed for some reason.

class com_interface.ip_utils.EthAddr(ip_addr: 'str', port: 'int')

Bases: object

classmethod from_tuple(addr: tuple[str, int]) EthAddr
ip_addr: str
port: int
property to_tuple: tuple[str, int]
class com_interface.ip_utils.TcpIpConfigIds(value)

Bases: Enum

An enumeration.

RECV_ADDRESS = 2
RECV_MAX_SIZE = 3
SEND_ADDRESS = 1
SPACE_PACKET_ID = 4
class com_interface.ip_utils.TcpIpType(value)

Bases: Enum

An enumeration.

TCP = 1
UDP = 2
UDP_RECV = 3

Serial

class com_interface.serial_cobs.SerialCobsComIF(ser_cfg: SerialCfg)

Bases: SerialComBase, ComInterface

Serial communication interface which uses the COBS protocol to encode and decode packets.

This class will spin up a receiver thread on the open() call to poll for COBS encoded packets. It decodes all received COBS frames using cobs.cobs.decode(). This means that the close() call might block until the receiver thread has shut down.

clear() None
close(args: Any | None = None) None

Closes the ComIF and releases any held resources (for example a Communication Port).

Returns:

encode_data(data: bytes | bytearray) bytearray

Encodes the data using the COBS protocol. :param data: Data to encode. :return: Encoded data.

property id: str
initialize(args: Any | None = None) None

Perform initializations step which can not be done in constructor or which require returnvalues.

is_open() bool

Can be used to check whether the communication interface is open. This is useful if opening a COM interface takes a longer time and is non-blocking

open(args: Any | None = None) None

Spins up a receiver thread to permanently check for new COBS encoded packets.

packets_available(parameters: Any = 0) int

Poll whether packets are available.

Parameters:

parameters – Can be an arbitrary parameter.

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding when determining the number of available packets, this exception can be thrown on decoding errors.

Returns:

Number of packets available.

receive(parameters: Any = 0) list[bytes]

Returns a list of received packets. The child class can use a separate thread to poll for the packets or use some other mechanism and container like a deque to store packets to be returned here.

Parameters:

parameters

Raises:

ReceptionDecodeError – If the underlying COM interface uses encoding and decoding and the decoding fails, this exception will be returned.

Returns:

send(data: bytes | bytearray) None

This function encodes all data using the cobs.cobs.encode() function.