Pipelines¶
Pipelines are used to provide a plug and play environment that can be used
for quick prototyping and experimentation. Addtionally, port
checks implemented
by the framework ensures consistency in data as the processes
are connected
to work in sync with each other.
Pipeline is used to express the data flow in a system. The data flow is the input for the workhorse of framework, Sprokit. Sprokit handles every aspect of execution for the pipeline. This includes
- Finding registered processes (including out-of-box processes from Kwiver).
- Enforcing
port
checks based on theport_traits
declared by the process. - Scheduling and synchronizing the
processes
in a pipeline. - Providing introspection capbabilities during the development for optimization.
- Providing tools for execution and visualization of the pipeline.
Pipelines can be written as plaintext file or can be embedded in Python or C++ program.
To execute the plaintext file, pipeline_runner is provided by Kwiver. Furthermore,
plaintext file can be translated into their Python/C++ counterpart using bake
utility provided by Kwiver. The semantics and design guidelines for pipelines is
provided by Kwiver.
Plaintext
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | #
# Pipe file to test darknet locally
#
# Image configuration
config input
exp_file = etc/rc3d_experiment.yml
# ------------------------------------------------------------------
process exp :: diva_experiment
experiment_file_name = $CONFIG{input:exp_file}
# ------------------------------------------------------------------
# Darknet instance
process yolo_v2 :: image_object_detector
:detector:type darknet
:detector:darknet:net_config models/virat_auto.inference.cfg
:detector:darknet:weight_file models/virat_auto_final.weights
:detector:darknet:class_names models/virat.names
:detector:darknet:thresh 0.5
:detector:darknet:hier_thresh 0.5
:detector:darknet:gpu_index 0
:detector:darknet:resize_ni 1024
:detector:darknet:resize_ni 1024
connect from exp.image to yolo_v2.image
# -----------------------------------------------------------------
# CSV writer
process sink :: detected_object_output
:writer:type csv
file_name = test.csv
connect from yolo_v2.detected_object_set to sink.detected_object_set
# ------------------------------------------------------------------
# use python scheduler
config _scheduler
type = pythread_per_process
config _pipeline:_edge
capacity = 1 # set default edge capacity
|
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | exp_config = create_diva_experiment_config( exp_file )
exp_process = process_factory.create_process( "diva_experiment", "exp", exp_config )
# Create Image Object Detector Process
darknet_config = create_darknet_config( darknet_root, gpu )
yolo_process = process_factory.create_process( "image_object_detector", "yolo_v2", \
darknet_config )
# Create Detected object set output
detected_output_config = config.empty_config()
detected_output_config.set_value( "writer:type", "csv" )
detected_output_config.set_value( "file_name", csv_path )
detected_output_process = process_factory.create_process( "detected_object_output", "sink",
detected_output_config )
# Create pipeline out of these
darknet_pipeline = pipeline.Pipeline()
darknet_pipeline.add_process( exp_process )
darknet_pipeline.add_process( yolo_process )
darknet_pipeline.add_process( detected_output_process )
# Create connections
darknet_pipeline.connect( "exp", "image", \
"yolo_v2", "image" )
darknet_pipeline.connect( "yolo_v2", "detected_object_set", \
"sink", "detected_object_set" )
|
C++
1 | // Coming Soon
|
Although, Sprokit implicitly supports multi-threaded pipelines, the pipelines are
restricted to a single machine. The framework relies on ZeroMQ for providing distributed
pipelines. These pipelines use zeromq_transport_send
and
zeromq_transport_receive
processes to communicate over the network. Currently,
ZeroMQ based pipelines support publisher-subscriber pattern with multiple publishers
and subscribers.
ZeroMQ Sender
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | #
# Pipe file for input
#
# Image configuration
config input
exp_file = etc/video_experiment.yml
# ------------------------------------------------------------------
process exp :: diva_experiment
experiment_file_name = $CONFIG{input:exp_file}
# ------------------------------------------------------------------
process protobuf_ser :: serializer
serialization_type = protobuf
connect from exp.timestamp to protobuf_ser.timg/timestamp
connect from exp.image to protobuf_ser.timg/image
connect from exp.file_name to protobuf_ser.timg/file_name
process zmq_timg :: zmq_transport_send
port = 5560
expected_subscribers=1
connect from protobuf_ser.timg to zmq_timg.serialized_message
config _scheduler
type = pythread_per_process
config _pipeline:_edge
capacity = 1 # set default edge capacity
|
ZeroMQ Receiver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | #
# Pipe file for darkent
#
process zmq :: zmq_transport_receive
port = 5560
num_publishers = 1
# --------------------------------------------------
process dser :: deserializer
serialization_type = protobuf
connect from zmq.serialized_message to dser.timg
# ---------------------------------------------------
# Darknet instance
process yolo_v2 :: image_object_detector
:detector:type darknet
:detector:darknet:net_config models/virat_auto.inference.cfg
:detector:darknet:weight_file models/virat_auto_final.weights
:detector:darknet:class_names models/virat.names
:detector:darknet:thresh 0.5
:detector:darknet:hier_thresh 0.5
:detector:darknet:gpu_index 0
:detector:darknet:resize_ni 1024
:detector:darknet:resize_ni 1024
connect from dser.timg.image to yolo_v2.image
# ---------------------------------------------------
# CSV writer
process sink :: detected_object_output
:writer:type csv
file_name = test.csv
connect from yolo_v2.detected_object_set to sink.detected_object_set
# ------------------------------------------------------------------
# use python scheduler
config _scheduler
type = pythread_per_process
config _pipeline:_edge
capacity = 1 # set default edge capacity
|
Every algorithm present in DIVA has a dedicated local and ZeroMQ pipeline to replicate the offline behavior of the algorithm in an online enviornment on a single system or on multiple systems. These pipelines have been pictorially documented below for the ease of understanding.