#include <icy/packetstream.h>Inherits:
Stateful< PacketStreamState >
Processes and broadcasts IPackets through a configurable adapter graph.
A PacketStream consists of one or many PacketSources, one or many PacketProcessors, and one or many delegate receivers.
This class enables the developer to setup a processor chain in order to perform arbitrary processing on data packets using interchangeable packet adapters, and pump the output to any delegate function or even another PacketStream.
Note that PacketStream itself inherits from PacketStreamAdapter, so a PacketStream can be the source of another PacketStream.
All PacketStream methods are thread-safe, but once the stream is running you will not be able to attach or detach stream adapters.
In order to synchronize output packets with the application event loop take a look at the SyncPacketQueue class. For lengthy operations you can add an AsyncPacketQueue to the start of the stream to defer processing from the PacketSource thread.
| Return | Name | Description |
|---|---|---|
PacketSignal | emitter | Signals to delegates on outgoing packets. |
ThreadSignal< void(PacketStream &, const std::exception_ptr &)> | Error | Signals that the PacketStream is in Error state. If stream output is synchronized then the Error signal will be sent from the synchronization context, otherwise it will be sent from the async processor context. See synchronizeOutput() |
ThreadSignal< void(PacketStream &)> | Close | Signals that the PacketStream is in Close state. This signal is sent immediately via the close() method, and as such will be sent from the calling thread context. |
PacketSignal emitterSignals to delegates on outgoing packets.
ThreadSignal< void(PacketStream &, const std::exception_ptr &)> ErrorSignals that the PacketStream is in Error state. If stream output is synchronized then the Error signal will be sent from the synchronization context, otherwise it will be sent from the async processor context. See synchronizeOutput()
ThreadSignal< void(PacketStream &)> CloseSignals that the PacketStream is in Close state. This signal is sent immediately via the close() method, and as such will be sent from the calling thread context.
| Return | Name | Description |
|---|---|---|
PacketStream | Construct a named packet stream. | |
~PacketStream virtual | Destroy the stream; calls close() then reset() to release all adapters. | |
PacketStream | Deleted constructor. | |
PacketStream | Deleted constructor. | |
void | start virtual | Start the stream and synchronized sources. |
void | stop virtual | Stop the stream and synchronized sources. |
void | pause virtual | Pause the stream. |
void | resume virtual | Resume the stream. |
void | close virtual | Close the stream and transition the internal state to Closed. |
void | reset virtual | Cleanup all managed stream adapters and reset the stream state. |
bool | active virtual const | Returns true when the stream is in the Active state. |
bool | stopped virtual const | Returns true when the stream is in the Stopping or Stopped state. |
bool | closed virtual const | Returns true when the stream is in the Closed or Error state. |
bool | lock virtual | Sets the stream to locked state. In a locked state no new adapters can be added or removed from the stream until the stream is stopped. |
bool | locked virtual const | Returns true is the stream is currently locked. |
void | write virtual | Write a mutable buffer into the stream without copying. The caller must keep the buffer alive until processing crosses a Cloned/Retained boundary or, if the graph is fully synchronous, until write() returns. |
void | write virtual | Write a read-only buffer into the stream; data is copied immediately into an owning RawPacket before any adapter sees it. |
void | write virtual | Write a packet directly into the processing chain. |
void | attachSource virtual | Attach a bare packet signal as a stream source. The signal is wrapped in an unowned PacketStreamAdapter internally. Useful when the source is another PacketStream::emitter. |
void | attachSource virtual | Attach a PacketStreamAdapter as a source. Source adapters default to Borrowed retention unless overridden; they must not retain inbound packet storage asynchronously without reporting Cloned or Retained. |
void | attachSource inline | Attach a shared_ptr-managed source adapter to the stream. The stream shares ownership; the adapter is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketStreamAdapter. |
bool | detachSource virtual | Detach a source by its packet signal. Disconnects the signal from the stream's process slot and removes the adapter entry. |
bool | detachSource virtual | Detach a source by its adapter pointer. Disconnects the adapter's emitter from the stream's process slot and removes the entry. |
void | attach virtual | Attach a packet processor to the stream. Processors are executed in ascending order of their order value. Pass order = -1 to append at the end of the current processor list. Valid range is -1 to 101; values outside this range throw std::invalid_argument. Borrowed processors must finish with the packet before process() returns. Queue/processors that defer work must report Cloned or Retained via retention() so upstream code has an explicit ownership boundary. |
void | attach inline | Attach a shared_ptr-managed processor to the stream. The stream shares ownership; the processor is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketProcessor. |
bool | detach virtual | Detach a packet processor from the stream. The processor's delegate connections are removed; ownership is released if held. |
void | synchronizeOutput virtual | Synchronize stream output packets with a libuv event loop. Internally attaches a SyncPacketQueue at order 101 so that all packets emitted by the processor chain are dispatched from the loop thread rather than the source thread. Must be called before start(). |
void | autoStart virtual | Enable or disable auto-start behaviour (default: false). When enabled, the stream automatically transitions to Active state upon receiving the first packet while in the None or Locked state. Must be called before start(). |
void | closeOnError virtual | Enable or disable close-on-error behaviour (default: true). When enabled, an unhandled processor exception causes the stream to transition from Error to Closed state automatically. |
const std::exception_ptr & | error | Accessors for the unmanaged client data pointer. |
std::string | name const | Return the name assigned to this stream at construction. |
PacketAdapterVec | adapters const | Returns a combined list of all stream sources and processors. |
PacketAdapterVec | sources const | Returns a list of all stream sources. |
PacketAdapterVec | processors const | Returns a list of all stream processors. |
int | numSources const | Return the number of source adapters currently registered. |
int | numProcessors const | Return the number of processor adapters currently registered. |
int | numAdapters const | Return the total number of adapters (sources + processors). |
AdapterT * | getSource inline | Return the nth source of type AdapterT, or nullptr if not found. Sources are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted. |
AdapterT * | getProcessor inline | Return the nth processor of type AdapterT, or nullptr if not found. Processors are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted. |
PacketProcessor * | getProcessor inline | Return the processor registered at a specific order value. Unlike the template overload, this searches by order rather than by type and index. |
PacketStream(const std::string & name)Construct a named packet stream.
name Optional human-readable name used in log output.virtual
virtual ~PacketStream()Destroy the stream; calls close() then reset() to release all adapters.
PacketStream(const PacketStream &) = deleteDeleted constructor.
PacketStream(PacketStream &&) = deleteDeleted constructor.
virtual
virtual void start()Start the stream and synchronized sources.
virtual
virtual void stop()Stop the stream and synchronized sources.
virtual
virtual void pause()Pause the stream.
virtual
virtual void resume()Resume the stream.
virtual
virtual void close()Close the stream and transition the internal state to Closed.
virtual
virtual void reset()Cleanup all managed stream adapters and reset the stream state.
virtual const
virtual bool active() constReturns true when the stream is in the Active state.
virtual const
virtual bool stopped() constReturns true when the stream is in the Stopping or Stopped state.
virtual const
virtual bool closed() constReturns true when the stream is in the Closed or Error state.
virtual
virtual bool lock()Sets the stream to locked state. In a locked state no new adapters can be added or removed from the stream until the stream is stopped.
virtual const
virtual bool locked() constReturns true is the stream is currently locked.
virtual
virtual void write(char * data, size_t len)Write a mutable buffer into the stream without copying. The caller must keep the buffer alive until processing crosses a Cloned/Retained boundary or, if the graph is fully synchronous, until write() returns.
data Pointer to the raw data buffer.
len Number of bytes to process.
virtual
virtual void write(const char * data, size_t len)Write a read-only buffer into the stream; data is copied immediately into an owning RawPacket before any adapter sees it.
data Pointer to the raw data buffer.
len Number of bytes to process.
virtual
virtual void write(IPacket && packet)Write a packet directly into the processing chain.
packet Packet to process; moved into the stream.virtual
virtual void attachSource(PacketSignal & source)Attach a bare packet signal as a stream source. The signal is wrapped in an unowned PacketStreamAdapter internally. Useful when the source is another PacketStream::emitter.
source The packet signal to attach; must outlive the stream.virtual
virtual void attachSource(PacketStreamAdapter * source, bool owned, bool syncState)Attach a PacketStreamAdapter as a source. Source adapters default to Borrowed retention unless overridden; they must not retain inbound packet storage asynchronously without reporting Cloned or Retained.
source The adapter to attach; must not be null.
owned If true the stream takes ownership and deletes the pointer on teardown.
syncState If true and source implements basic::Startable, its start()/stop() will be called by startSources()/stopSources().
inline
template<class C> inline void attachSource(std::shared_ptr< C > ptr, bool syncState)Attach a shared_ptr-managed source adapter to the stream. The stream shares ownership; the adapter is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketStreamAdapter.
C Adapter type; must derive from PacketStreamAdapter.ptr Shared pointer to the adapter instance.
syncState If true and ptr implements basic::Startable, its start()/stop() will be called by startSources()/stopSources().
virtual
virtual bool detachSource(PacketSignal & source)Detach a source by its packet signal. Disconnects the signal from the stream's process slot and removes the adapter entry.
source The packet signal previously attached via attachSource(PacketSignal&).true if the source was found and removed, false otherwise.
virtual
virtual bool detachSource(PacketStreamAdapter * source)Detach a source by its adapter pointer. Disconnects the adapter's emitter from the stream's process slot and removes the entry.
source Pointer to the adapter previously attached.true if the source was found and removed, false otherwise.
virtual
virtual void attach(PacketProcessor * proc, int order, bool owned)Attach a packet processor to the stream. Processors are executed in ascending order of their order value. Pass order = -1 to append at the end of the current processor list. Valid range is -1 to 101; values outside this range throw std::invalid_argument. Borrowed processors must finish with the packet before process() returns. Queue/processors that defer work must report Cloned or Retained via retention() so upstream code has an explicit ownership boundary.
proc The processor to attach; must not be null.
order Position in the processing chain (lower runs first).
owned If true the stream takes ownership and deletes the pointer on teardown.
inline
template<class C> inline void attach(std::shared_ptr< C > ptr, int order, bool syncState)Attach a shared_ptr-managed processor to the stream. The stream shares ownership; the processor is kept alive at least until teardown. Throws std::runtime_error if ptr does not derive from PacketProcessor.
C Processor type; must derive from PacketProcessor.ptr Shared pointer to the processor instance.
order Position in the processing chain (lower runs first).
syncState Reserved for future use; currently unused.
virtual
virtual bool detach(PacketProcessor * proc)Detach a packet processor from the stream. The processor's delegate connections are removed; ownership is released if held.
proc Pointer to the processor to remove.true if the processor was found and removed, false otherwise.
virtual
virtual void synchronizeOutput(uv::Loop * loop)Synchronize stream output packets with a libuv event loop. Internally attaches a SyncPacketQueue at order 101 so that all packets emitted by the processor chain are dispatched from the loop thread rather than the source thread. Must be called before start().
loop The event loop to synchronize output onto; must not be null.virtual
virtual void autoStart(bool flag)Enable or disable auto-start behaviour (default: false). When enabled, the stream automatically transitions to Active state upon receiving the first packet while in the None or Locked state. Must be called before start().
flag true to enable auto-start, false to disable.virtual
virtual void closeOnError(bool flag)Enable or disable close-on-error behaviour (default: true). When enabled, an unhandled processor exception causes the stream to transition from Error to Closed state automatically.
flag true to close the stream on error, false to remain in Error state.const std::exception_ptr & error()Accessors for the unmanaged client data pointer.
Return the last captured exception, if the stream is in Error state. The pointer is null when no error has occurred.
A reference to the stored exception_ptr; empty if no error.
const
std::string name() constReturn the name assigned to this stream at construction.
The stream name; empty string if none was provided.
const
PacketAdapterVec adapters() constReturns a combined list of all stream sources and processors.
const
PacketAdapterVec sources() constReturns a list of all stream sources.
const
PacketAdapterVec processors() constReturns a list of all stream processors.
const
int numSources() constReturn the number of source adapters currently registered.
Source count; thread-safe.
const
int numProcessors() constReturn the number of processor adapters currently registered.
Processor count; thread-safe.
const
int numAdapters() constReturn the total number of adapters (sources + processors).
Combined adapter count; thread-safe.
inline
template<class AdapterT> inline AdapterT * getSource(int index)Return the nth source of type AdapterT, or nullptr if not found. Sources are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.
AdapterT Target type; must derive from PacketStreamAdapter.index Zero-based index among matching sources (default 0).Pointer to the matching adapter, or nullptr.
inline
template<class AdapterT> inline AdapterT * getProcessor(int index)Return the nth processor of type AdapterT, or nullptr if not found. Processors are searched in their registered order; only adapters that dynamic_cast successfully to AdapterT are counted.
AdapterT Target type; must derive from PacketProcessor.index Zero-based index among matching processors (default 0).Pointer to the matching processor, or nullptr.
inline
inline PacketProcessor * getProcessor(int order)Return the processor registered at a specific order value. Unlike the template overload, this searches by order rather than by type and index.
order The order value to match (default 0).Pointer to the matching processor, or nullptr if none registered at that order.
| Return | Name | Description |
|---|---|---|
std::mutex | _mutex | |
std::mutex | _procMutex | |
std::string | _name | |
PacketAdapterVec | _sources | |
PacketAdapterVec | _processors | |
std::deque< PacketStreamState > | _states | |
std::exception_ptr | _error | |
bool | _autoStart | |
bool | _closeOnError | |
bool | _wired |
std::mutex _mutexstd::mutex _procMutexstd::string _namePacketAdapterVec _sourcesPacketAdapterVec _processorsstd::deque< PacketStreamState > _statesstd::exception_ptr _errorbool _autoStartbool _closeOnErrorbool _wired| Return | Name | Description |
|---|---|---|
void | setup | Attach the source and processor delegate chain. |
void | teardown | Detach the source and processor delegate chain. |
void | attachSource | |
void | attach | |
void | startSources | Start synchronized sources. |
void | stopSources | Stop synchronized sources. |
void | process virtual | Process incoming packets. |
void | emit | Emit the final packet to listeners. |
void | synchronizeStates | Synchronize queued states with adapters. |
void | onStateChange virtual | Override the Stateful::onStateChange method. |
void | assertCanModify | Returns true if the given state ID is queued. |
void | handleException | Handle an internal exception. |
void setup()Attach the source and processor delegate chain.
void teardown()Detach the source and processor delegate chain.
void attachSource(PacketAdapterReference::Ptr ref)void attach(PacketAdapterReference::Ptr ref)void startSources()Start synchronized sources.
void stopSources()Stop synchronized sources.
virtual
virtual void process(IPacket & packet)Process incoming packets.
void emit(IPacket & packet)Emit the final packet to listeners.
Synchronized signals such as Close and Error are sent from this method. See synchronizeOutput()
void synchronizeStates()Synchronize queued states with adapters.
virtual
virtual void onStateChange(PacketStreamState & state, const PacketStreamState & oldState)Override the Stateful::onStateChange method.
void assertCanModify()Returns true if the given state ID is queued.
Asserts that the stream can be modified, ie is not in the Locked, Stopping or Active states.
void handleException(std::exception & exc)Handle an internal exception.
| Name | Description |
|---|---|
Ptr |
std::shared_ptr< PacketStream > Ptr()