Data stream#
This interface lets you stream metrics of experiments safely to a database during their execution, enabling monitoring of their progress.
Data streams use UNIX (local) and INET (network) socket datagrams: independent, self-contained messages whose arrival and arrival time are not guaranteed. Their checksum ensures the integrity of the messages.
The size of each message is limited by default to 1500
bytes with INET sockets and 4096
bytes with UNIX sockets.
The operating system provides a buffer for sockets. If the buffer is full, all new messages are discarded.
The streaming of experiments transparently supports the Sequence
objects that are stored as run.fields
attributes.
If a message is lost, its corresponding Sequence
will miss a row associated with its corresponding idx
value.
Tip
An experiment can be persisted after its execution, overwriting its partially streamed state, resolving any data loss due to lost messages, and storing all other fields that were not part of the streaming data.
Streaming is possible for experiments and runs already in the database, ensuring consistency over time.
Only sequences that do exist before calling experiment.execute(...)
are streamed.
The factory step init_sequences(...)
lets you initialize the sequences you want to stream in a preliminary execution.
Example: Streaming sequences#
This example shows how to stream sequences transparently:
Data stream example
[MainThread] Logging level set to DEBUG
[MainThread] Created DB link: 'sqlite:///mltraq.db'
[MainThread] Using backend: loky
[MainThread] Executing 1 tasks on 10 workers (backend:loky)
[MainThread] Persisting experiment (table name: experiment_example)
[MainThread] Created DB link: 'sqlite:///mltraq.db'
[MainThread] Listening on 'mltraq.sock' (UNIX)
[DataStreamServer] DataStreamServer: Waiting for messages
[MainThread] Using backend: loky
[MainThread] Executing 1 tasks on 10 workers (backend:loky)
[DatabaseWriter] Loading experiment id_experiment='d65df69e-1175-44a5-be2f-2232765703b8' name='None'
[DatabaseWriter] SQL: SELECT experiment_example.id_experiment, experiment_example.id_run, experiment_example.metrics
FROM experiment_example
[DatabaseWriter] Persisting experiment (table name: experiment_example)
[DatabaseWriter] DatabaseWriter: Processed 1 new messages for 1 experiments
[MainThread] DataStreamServer: Requested termination ...
[DataStreamServer] DataStreamServer: Cleanup ...
[DataStreamServer] DataStreamServer: Stats: count_messages=1 rate=0.98 messages/s
[DataStreamServer] DatabaseWriter: Requested termination ...
[DatabaseWriter] DatabaseWriter: No new messages to process
[DataStreamServer] DatabaseWriter: Terminated
[MainThread] DataStreamServer: Terminated
[MainThread] Loading experiment id_experiment='None' name='example'
[MainThread] SQL: SELECT experiment_example.id_experiment, experiment_example.id_run, experiment_example.metrics
FROM experiment_example
--
Streamed metrics:
idx timestamp v
0 0 2024-10-24 12:48:44.988341 123.0
Example: Streaming metrics with explicit activation#
The following example demonstrates how to track and store metrics in the database during the execution of an experiment, enabling explicitly the datastream client.
- Configure a stream over the network, specifying the address to send the messages to, and the server address
- Create and persist an experiment with a
"metrics"
Sequence attribute using the parametrized stepcreate_sequences
- Tracking and streaming of a metric
v
, relying on the contextrun.datastream_client()
- Receiving the streaming data using the context
session.datastream_server()
Data stream example
Streamed fields: ['metrics']
--
Streamed metrics:
idx timestamp v
0 0 2024-10-24 12:48:48.799337 0.0
1 1 2024-10-24 12:48:48.800667 1.0
2 2 2024-10-24 12:48:48.801798 2.0
3 3 2024-10-24 12:48:48.803150 3.0
4 4 2024-10-24 12:48:48.804498 4.0
5 5 2024-10-24 12:48:48.805844 5.0
6 6 2024-10-24 12:48:48.807191 6.0
7 7 2024-10-24 12:48:48.808320 7.0
8 8 2024-10-24 12:48:48.809664 8.0
9 9 2024-10-24 12:48:48.811008 9.0
--
Tracked fields: ['dataset', 'metrics']
The output reports the logs to ease the understanding of what's happening in the threads: The main thread handles the execution of the experiment and the execution of the run (parallelization is turned off to simplify the example), the 2nd thread is the server listening of new messages, and a 3rd thread is in charge of updating the experiments on the database.
The sequence run.metrics
is streamed. run.dataset
is not streamed, as it is not a Sequence. The tracked value of run.dataset
becomes available only once the experiment is persisted after its execution is complete.