Skip to content

Data stream#

This interface lets you stream metrics of experiments safely to a database during their execution, enabling monitoring of their progress.

Data streams use UNIX (local) and INET (network) socket datagrams: independent, self-contained messages whose arrival and arrival time are not guaranteed. Their checksum ensures the integrity of the messages.

The size of each message is limited by default to 1500 bytes with INET sockets and 4096 bytes with UNIX sockets. The operating system provides a buffer for sockets. If the buffer is full, all new messages are discarded.

The streaming of experiments transparently supports the Sequence objects that are stored as run.fields attributes. If a message is lost, its corresponding Sequence will miss a row associated with its corresponding idx value.

Tip

An experiment can be persisted after its execution, overwriting its partially streamed state, resolving any data loss due to lost messages, and storing all other fields that were not part of the streaming data.

Streaming is possible for experiments and runs already in the database, ensuring consistency over time. Only sequences that do exist before calling experiment.execute(...) are streamed. The factory step init_sequences(...) lets you initialize the sequences you want to stream in a preliminary execution.

Example: Streaming sequences#

This example shows how to stream sequences transparently:

Data stream example

from mltraq import Run, create_session, options
from mltraq.steps.init_sequences import init_sequences
from mltraq.utils.fs import tmpdir_ctx
from mltraq.utils.logging import logging_ctx

with options().ctx({"datastream.disable": False}), logging_ctx(
    level_name="DEBUG", log_format="[%(threadName)s] %(message)s"
), tmpdir_ctx():

    # Create a new experiment
    session = create_session("sqlite:///mltraq.db")
    experiment = session.create_experiment("example")

    # Add the sequence "metrics" and persist the experiment
    experiment.execute(init_sequences("metrics")).persist()

    def track(run: Run):
        """
        Track and stream a record in the `metrics` sequence.
        """
        run.fields.metrics.append(v=123)

    with session.datastream_server() as ds:
        # datastream_server() starts the threads to handle
        # the incoming messages, writing new records to database.

        # Execute `track` step
        experiment.execute(track)

        # Make sure that the DatabaseWriter
        # processed at least one record.
        ds.dbw.received.wait()

    # Up to this point, we did not persist the experiment, only
    # the streamed records have been tracked to database.

    # Load experiment, showing the contents of the metrics Sequence.
    streamed_experiment = session.load_experiment("example")

    print("\n--\nStreamed metrics:")
    print(streamed_experiment.runs.first().fields.metrics.df())
Output
[MainThread] Logging level set to DEBUG
[MainThread] Created DB link: 'sqlite:///mltraq.db'
[MainThread] Using backend: loky
[MainThread] Executing 1 tasks on 10 workers (backend:loky)
[MainThread] Persisting experiment (table name: experiment_example)
[MainThread] Created DB link: 'sqlite:///mltraq.db'
[MainThread] Listening on 'mltraq.sock' (UNIX)
[DataStreamServer] DataStreamServer: Waiting for messages
[MainThread] Using backend: loky
[MainThread] Executing 1 tasks on 10 workers (backend:loky)
[DatabaseWriter] Loading experiment id_experiment='d65df69e-1175-44a5-be2f-2232765703b8' name='None'
[DatabaseWriter] SQL: SELECT experiment_example.id_experiment, experiment_example.id_run, experiment_example.metrics 
FROM experiment_example
[DatabaseWriter] Persisting experiment (table name: experiment_example)
[DatabaseWriter] DatabaseWriter: Processed 1 new messages for 1 experiments
[MainThread] DataStreamServer: Requested termination ...
[DataStreamServer] DataStreamServer: Cleanup ...
[DataStreamServer] DataStreamServer: Stats: count_messages=1 rate=0.98 messages/s
[DataStreamServer] DatabaseWriter: Requested termination ...
[DatabaseWriter] DatabaseWriter: No new messages to process
[DataStreamServer] DatabaseWriter: Terminated
[MainThread] DataStreamServer: Terminated
[MainThread] Loading experiment id_experiment='None' name='example'
[MainThread] SQL: SELECT experiment_example.id_experiment, experiment_example.id_run, experiment_example.metrics 
FROM experiment_example

--
Streamed metrics:
   idx                  timestamp      v
0    0 2024-10-24 12:48:44.988341  123.0

Example: Streaming metrics with explicit activation#

The following example demonstrates how to track and store metrics in the database during the execution of an experiment, enabling explicitly the datastream client.

  1. Configure a stream over the network, specifying the address to send the messages to, and the server address
  2. Create and persist an experiment with a "metrics" Sequence attribute using the parametrized step create_sequences
  3. Tracking and streaming of a metric v, relying on the context run.datastream_client()
  4. Receiving the streaming data using the context session.datastream_server()

Data stream example

import numpy as np

from mltraq import Run, create_session, options
from mltraq.steps.init_sequences import init_sequences
from mltraq.utils.fs import tmpdir_ctx
from mltraq.utils.logging import logging_ctx

with options().ctx(
    {
        "datastream.cli_address": "127.0.0.1:9000",
        "datastream.srv_address": "127.0.0.1:9000",
        "datastream.kind": "INET",
    }
), logging_ctx(
    level_name="DEBUG", log_format="[%(threadName)s] %(message)s"
), tmpdir_ctx():

    # Create a new experiment
    session = create_session("sqlite:///mltraq.db")
    experiment = session.create_experiment("example")

    # Add a sequence "metrics" and persist experiment
    experiment.execute(init_sequences("metrics"), n_jobs=1).persist(
        if_exists="replace"
    )

    def track(run: Run):
        """
        Add 10 values to the `metrics`, streaming them.

        We track also a field `dataset`, which is not streamed
        as its type is not `Sequence`.
        """
        with run.datastream_client():
            run.fields.dataset = np.zeros(100)
            for v in range(10):
                run.fields.metrics.append(v=v)

    with session.datastream_server() as ds:
        # datastream_server() starts the threads to handle
        # the incoming messages, writing new records to database.

        # Execute `track` step
        experiment.execute(track, n_jobs=1)

        # Make sure that the DatabaseWriter
        # processed at least one record.
        ds.dbw.received.wait()

    print("\n")

    # Up to this point, we did not persist the experiment, only
    # the streamed records have been tracked to database.

    # Load experiment, showing the contents of the metrics Sequence.
    streamed_experiment = session.load_experiment("example")

    # Show tracked fields. Only "metrics" is present,
    # `run.fields.dataset` is not a Sequence and therefore not streamed.
    print(
        "Streamed fields:",
        list(streamed_experiment.runs.first().fields.keys()),
    )
    print("--\n")

    print("Streamed metrics:")
    print(streamed_experiment.runs.first().fields.metrics.df())
    print("--\n")

    # Persist and reload reload experiment, showing the tracked fields.
    # The copy with the streamed data is replaced by the complete experiment.
    experiment.persist(if_exists="replace")

    print(
        "Tracked fields:",
        list(
            session.load_experiment("example").runs.first().fields.keys()
        ),
    )
Output
Streamed fields: ['metrics']
--

Streamed metrics:
   idx                  timestamp    v
0    0 2024-10-24 12:48:48.799337  0.0
1    1 2024-10-24 12:48:48.800667  1.0
2    2 2024-10-24 12:48:48.801798  2.0
3    3 2024-10-24 12:48:48.803150  3.0
4    4 2024-10-24 12:48:48.804498  4.0
5    5 2024-10-24 12:48:48.805844  5.0
6    6 2024-10-24 12:48:48.807191  6.0
7    7 2024-10-24 12:48:48.808320  7.0
8    8 2024-10-24 12:48:48.809664  8.0
9    9 2024-10-24 12:48:48.811008  9.0
--

Tracked fields: ['dataset', 'metrics']

The output reports the logs to ease the understanding of what's happening in the threads: The main thread handles the execution of the experiment and the execution of the run (parallelization is turned off to simplify the example), the 2nd thread is the server listening of new messages, and a 3rd thread is in charge of updating the experiments on the database.

The sequence run.metrics is streamed. run.dataset is not streamed, as it is not a Sequence. The tracked value of run.dataset becomes available only once the experiment is persisted after its execution is complete.