Base module

PacketStream

Processes and broadcasts IPackets through a configurable adapter graph.

PacketStream

#include <icy/packetstream.h>

Inherits: Stateful< PacketStreamState >

Processes and broadcasts IPackets through a configurable adapter graph.

A PacketStream consists of one or many PacketSources, one or many PacketProcessors, and one or many delegate receivers.

This class enables the developer to setup a processor chain in order to perform arbitrary processing on data packets using interchangeable packet adapters, and pump the output to any delegate function or even another PacketStream.

Note that PacketStream itself inherits from PacketStreamAdapter, so a PacketStream can be the source of another PacketStream.

All PacketStream methods are thread-safe, but once the stream is running you will not be able to attach or detach stream adapters.

In order to synchronize output packets with the application event loop take a look at the SyncPacketQueue class. For lengthy operations you can add an AsyncPacketQueue to the start of the stream to defer processing from the PacketSource thread.

Public Attributes

ReturnNameDescription
PacketSignalemitterSignals to delegates on outgoing packets.
ThreadSignal< void(PacketStream &, const std::exception_ptr &)>ErrorSignals that the PacketStream is in Error state. If stream output is synchronized then the Error signal will be sent from the synchronization context, otherwise it will be sent from the async processor context. See synchronizeOutput()
ThreadSignal< void(PacketStream &)>CloseSignals that the PacketStream is in Close state. This signal is sent immediately via the close() method, and as such will be sent from the calling thread context.

emitter

PacketSignal emitter

Signals to delegates on outgoing packets.


Error

ThreadSignal< void(PacketStream &, const std::exception_ptr &)> Error

Signals that the PacketStream is in Error state. If stream output is synchronized then the Error signal will be sent from the synchronization context, otherwise it will be sent from the async processor context. See synchronizeOutput()


Close

ThreadSignal< void(PacketStream &)> Close

Signals that the PacketStream is in Close state. This signal is sent immediately via the close() method, and as such will be sent from the calling thread context.

Public Methods

ReturnNameDescription
PacketStreamConstruct a named packet stream.
~PacketStream virtualDestroy the stream; calls close() then reset() to release all adapters.
PacketStreamDeleted constructor.
PacketStreamDeleted constructor.
voidstart virtualStart the stream and synchronized sources.
voidstop virtualStop the stream and synchronized sources.
voidpause virtualPause the stream.
voidresume virtualResume the stream.
voidclose virtualClose the stream and transition the internal state to Closed.
voidreset virtualCleanup all managed stream adapters and reset the stream state.
boolactive virtual constReturns true when the stream is in the Active state.
boolstopped virtual constReturns true when the stream is in the Stopping or Stopped state.
boolclosed virtual constReturns true when the stream is in the Closed or Error state.
boollock virtualSets the stream to locked state. In a locked state no new adapters can be added or removed from the stream until the stream is stopped.
boollocked virtual constReturns true is the stream is currently locked.
voidwrite virtualWrite a mutable buffer into the stream without copying. The caller must keep the buffer alive until processing crosses a Cloned/Retained boundary or, if the graph is fully synchronous, until write() returns.
voidwrite virtualWrite a read-only buffer into the stream; data is copied immediately into an owning RawPacket before any adapter sees it.
voidwrite virtualWrite a packet directly into the processing chain.
voidattachSource virtualAttach a bare packet signal as a stream source. The signal is wrapped in an unowned PacketStreamAdapter internally. Useful when the source is another PacketStream::emitter.
voidattachSource virtualAttach a PacketStreamAdapter as a source. Source adapters default to Borrowed retention unless overridden; they must not retain inbound packet storage asynchronously without reporting Cloned or Retained.
voidattachSource inlineAttach a shared_ptr-managed source adapter to the stream. The stream shares ownership; the adapter is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketStreamAdapter.
booldetachSource virtualDetach a source by its packet signal. Disconnects the signal from the stream's process slot and removes the adapter entry.
booldetachSource virtualDetach a source by its adapter pointer. Disconnects the adapter's emitter from the stream's process slot and removes the entry.
voidattach virtualAttach a packet processor to the stream. Processors are executed in ascending order of their order value. Pass order = -1 to append at the end of the current processor list. Valid range is -1 to 101; values outside this range throw std::invalid_argument. Borrowed processors must finish with the packet before process() returns. Queue/processors that defer work must report Cloned or Retained via retention() so upstream code has an explicit ownership boundary.
voidattach inlineAttach a shared_ptr-managed processor to the stream. The stream shares ownership; the processor is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketProcessor.
booldetach virtualDetach a packet processor from the stream. The processor's delegate connections are removed; ownership is released if held.
voidsynchronizeOutput virtualSynchronize stream output packets with a libuv event loop. Internally attaches a SyncPacketQueue at order 101 so that all packets emitted by the processor chain are dispatched from the loop thread rather than the source thread. Must be called before start().
voidautoStart virtualEnable or disable auto-start behaviour (default: false). When enabled, the stream automatically transitions to Active state upon receiving the first packet while in the None or Locked state. Must be called before start().
voidcloseOnError virtualEnable or disable close-on-error behaviour (default: true). When enabled, an unhandled processor exception causes the stream to transition from Error to Closed state automatically.
const std::exception_ptr &errorAccessors for the unmanaged client data pointer.
std::stringname constReturn the name assigned to this stream at construction.
PacketAdapterVecadapters constReturns a combined list of all stream sources and processors.
PacketAdapterVecsources constReturns a list of all stream sources.
PacketAdapterVecprocessors constReturns a list of all stream processors.
intnumSources constReturn the number of source adapters currently registered.
intnumProcessors constReturn the number of processor adapters currently registered.
intnumAdapters constReturn the total number of adapters (sources + processors).
AdapterT *getSource inlineReturn the nth source of type AdapterT, or nullptr if not found. Sources are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.
AdapterT *getProcessor inlineReturn the nth processor of type AdapterT, or nullptr if not found. Processors are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.
PacketProcessor *getProcessor inlineReturn the processor registered at a specific order value. Unlike the template overload, this searches by order rather than by type and index.

PacketStream

PacketStream(const std::string & name)

Construct a named packet stream.

Parameters

  • name Optional human-readable name used in log output.

~PacketStream

virtual

virtual ~PacketStream()

Destroy the stream; calls close() then reset() to release all adapters.


PacketStream

PacketStream(const PacketStream &) = delete

Deleted constructor.


PacketStream

PacketStream(PacketStream &&) = delete

Deleted constructor.


start

virtual

virtual void start()

Start the stream and synchronized sources.


stop

virtual

virtual void stop()

Stop the stream and synchronized sources.


pause

virtual

virtual void pause()

Pause the stream.


resume

virtual

virtual void resume()

Resume the stream.


close

virtual

virtual void close()

Close the stream and transition the internal state to Closed.


reset

virtual

virtual void reset()

Cleanup all managed stream adapters and reset the stream state.


active

virtual const

virtual bool active() const

Returns true when the stream is in the Active state.


stopped

virtual const

virtual bool stopped() const

Returns true when the stream is in the Stopping or Stopped state.


closed

virtual const

virtual bool closed() const

Returns true when the stream is in the Closed or Error state.


lock

virtual

virtual bool lock()

Sets the stream to locked state. In a locked state no new adapters can be added or removed from the stream until the stream is stopped.


locked

virtual const

virtual bool locked() const

Returns true is the stream is currently locked.


write

virtual

virtual void write(char * data, size_t len)

Write a mutable buffer into the stream without copying. The caller must keep the buffer alive until processing crosses a Cloned/Retained boundary or, if the graph is fully synchronous, until write() returns.

Parameters

  • data Pointer to the raw data buffer.

  • len Number of bytes to process.


write

virtual

virtual void write(const char * data, size_t len)

Write a read-only buffer into the stream; data is copied immediately into an owning RawPacket before any adapter sees it.

Parameters

  • data Pointer to the raw data buffer.

  • len Number of bytes to process.


write

virtual

virtual void write(IPacket && packet)

Write a packet directly into the processing chain.

Parameters

  • packet Packet to process; moved into the stream.

attachSource

virtual

virtual void attachSource(PacketSignal & source)

Attach a bare packet signal as a stream source. The signal is wrapped in an unowned PacketStreamAdapter internally. Useful when the source is another PacketStream::emitter.

Parameters

  • source The packet signal to attach; must outlive the stream.

attachSource

virtual

virtual void attachSource(PacketStreamAdapter * source, bool owned, bool syncState)

Attach a PacketStreamAdapter as a source. Source adapters default to Borrowed retention unless overridden; they must not retain inbound packet storage asynchronously without reporting Cloned or Retained.

Parameters

  • source The adapter to attach; must not be null.

  • owned If true the stream takes ownership and deletes the pointer on teardown.

  • syncState If true and source implements basic::Startable, its start()/stop() will be called by startSources()/stopSources().


attachSource

inline

template<class C> inline void attachSource(std::shared_ptr< C > ptr, bool syncState)

Attach a shared_ptr-managed source adapter to the stream. The stream shares ownership; the adapter is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketStreamAdapter.

Parameters

Parameters


detachSource

virtual

virtual bool detachSource(PacketSignal & source)

Detach a source by its packet signal. Disconnects the signal from the stream's process slot and removes the adapter entry.

Parameters

Returns

true if the source was found and removed, false otherwise.


detachSource

virtual

virtual bool detachSource(PacketStreamAdapter * source)

Detach a source by its adapter pointer. Disconnects the adapter's emitter from the stream's process slot and removes the entry.

Parameters

  • source Pointer to the adapter previously attached.

Returns

true if the source was found and removed, false otherwise.


attach

virtual

virtual void attach(PacketProcessor * proc, int order, bool owned)

Attach a packet processor to the stream. Processors are executed in ascending order of their order value. Pass order = -1 to append at the end of the current processor list. Valid range is -1 to 101; values outside this range throw std::invalid_argument. Borrowed processors must finish with the packet before process() returns. Queue/processors that defer work must report Cloned or Retained via retention() so upstream code has an explicit ownership boundary.

Parameters

  • proc The processor to attach; must not be null.

  • order Position in the processing chain (lower runs first).

  • owned If true the stream takes ownership and deletes the pointer on teardown.


attach

inline

template<class C> inline void attach(std::shared_ptr< C > ptr, int order, bool syncState)

Attach a shared_ptr-managed processor to the stream. The stream shares ownership; the processor is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketProcessor.

Parameters

Parameters

  • ptr Shared pointer to the processor instance.

  • order Position in the processing chain (lower runs first).

  • syncState Reserved for future use; currently unused.


detach

virtual

virtual bool detach(PacketProcessor * proc)

Detach a packet processor from the stream. The processor's delegate connections are removed; ownership is released if held.

Parameters

  • proc Pointer to the processor to remove.

Returns

true if the processor was found and removed, false otherwise.


synchronizeOutput

virtual

virtual void synchronizeOutput(uv::Loop * loop)

Synchronize stream output packets with a libuv event loop. Internally attaches a SyncPacketQueue at order 101 so that all packets emitted by the processor chain are dispatched from the loop thread rather than the source thread. Must be called before start().

Parameters

  • loop The event loop to synchronize output onto; must not be null.

autoStart

virtual

virtual void autoStart(bool flag)

Enable or disable auto-start behaviour (default: false). When enabled, the stream automatically transitions to Active state upon receiving the first packet while in the None or Locked state. Must be called before start().

Parameters

  • flag true to enable auto-start, false to disable.

closeOnError

virtual

virtual void closeOnError(bool flag)

Enable or disable close-on-error behaviour (default: true). When enabled, an unhandled processor exception causes the stream to transition from Error to Closed state automatically.

Parameters

  • flag true to close the stream on error, false to remain in Error state.

error

const std::exception_ptr & error()

Accessors for the unmanaged client data pointer.

Return the last captured exception, if the stream is in Error state. The pointer is null when no error has occurred.

Returns

A reference to the stored exception_ptr; empty if no error.


name

const

std::string name() const

Return the name assigned to this stream at construction.

Returns

The stream name; empty string if none was provided.


adapters

const

PacketAdapterVec adapters() const

Returns a combined list of all stream sources and processors.


sources

const

PacketAdapterVec sources() const

Returns a list of all stream sources.


processors

const

PacketAdapterVec processors() const

Returns a list of all stream processors.


numSources

const

int numSources() const

Return the number of source adapters currently registered.

Returns

Source count; thread-safe.


numProcessors

const

int numProcessors() const

Return the number of processor adapters currently registered.

Returns

Processor count; thread-safe.


numAdapters

const

int numAdapters() const

Return the total number of adapters (sources + processors).

Returns

Combined adapter count; thread-safe.


getSource

inline

template<class AdapterT> inline AdapterT * getSource(int index)

Return the nth source of type AdapterT, or nullptr if not found. Sources are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.

Parameters

Parameters

  • index Zero-based index among matching sources (default 0).

Returns

Pointer to the matching adapter, or nullptr.


getProcessor

inline

template<class AdapterT> inline AdapterT * getProcessor(int index)

Return the nth processor of type AdapterT, or nullptr if not found. Processors are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.

Parameters

Parameters

  • index Zero-based index among matching processors (default 0).

Returns

Pointer to the matching processor, or nullptr.


getProcessor

inline

inline PacketProcessor * getProcessor(int order)

Return the processor registered at a specific order value. Unlike the template overload, this searches by order rather than by type and index.

Parameters

  • order The order value to match (default 0).

Returns

Pointer to the matching processor, or nullptr if none registered at that order.

Protected Attributes

ReturnNameDescription
std::mutex_mutex
std::mutex_procMutex
std::string_name
PacketAdapterVec_sources
PacketAdapterVec_processors
std::deque< PacketStreamState >_states
std::exception_ptr_error
bool_autoStart
bool_closeOnError
bool_wired

_mutex

std::mutex _mutex

_procMutex

std::mutex _procMutex

_name

std::string _name

_sources

PacketAdapterVec _sources

_processors

PacketAdapterVec _processors

_states

std::deque< PacketStreamState > _states

_error

std::exception_ptr _error

_autoStart

bool _autoStart

_closeOnError

bool _closeOnError

_wired

bool _wired

Protected Methods

ReturnNameDescription
voidsetupAttach the source and processor delegate chain.
voidteardownDetach the source and processor delegate chain.
voidattachSource
voidattach
voidstartSourcesStart synchronized sources.
voidstopSourcesStop synchronized sources.
voidprocess virtualProcess incoming packets.
voidemitEmit the final packet to listeners.
voidsynchronizeStatesSynchronize queued states with adapters.
voidonStateChange virtualOverride the Stateful::onStateChange method.
voidassertCanModifyReturns true if the given state ID is queued.
voidhandleExceptionHandle an internal exception.

setup

void setup()

Attach the source and processor delegate chain.


teardown

void teardown()

Detach the source and processor delegate chain.


attachSource

void attachSource(PacketAdapterReference::Ptr ref)

attach

void attach(PacketAdapterReference::Ptr ref)

startSources

void startSources()

Start synchronized sources.


stopSources

void stopSources()

Stop synchronized sources.


process

virtual

virtual void process(IPacket & packet)

Process incoming packets.


emit

void emit(IPacket & packet)

Emit the final packet to listeners.

Synchronized signals such as Close and Error are sent from this method. See synchronizeOutput()


synchronizeStates

void synchronizeStates()

Synchronize queued states with adapters.


onStateChange

virtual

virtual void onStateChange(PacketStreamState & state, const PacketStreamState & oldState)

Override the Stateful::onStateChange method.


assertCanModify

void assertCanModify()

Returns true if the given state ID is queued.

Asserts that the stream can be modified, ie is not in the Locked, Stopping or Active states.


handleException

void handleException(std::exception & exc)

Handle an internal exception.

Public Types

NameDescription
Ptr

Ptr

std::shared_ptr< PacketStream > Ptr()