Pipelines

Pipelines are used to provide a plug and play environment that can be used for quick prototyping and experimentation. Addtionally, port checks implemented by the framework ensures consistency in data as the processes are connected to work in sync with each other.

Pipeline is used to express the data flow in a system. The data flow is the input for the workhorse of framework, Sprokit. Sprokit handles every aspect of execution for the pipeline. This includes

  1. Finding registered processes (including out-of-box processes from Kwiver).
  2. Enforcing port checks based on the port_traits declared by the process.
  3. Scheduling and synchronizing the processes in a pipeline.
  4. Providing introspection capbabilities during the development for optimization.
  5. Providing tools for execution and visualization of the pipeline.

Pipelines can be written as plaintext file or can be embedded in Python or C++ program. To execute the plaintext file, pipeline_runner is provided by Kwiver. Furthermore, plaintext file can be translated into their Python/C++ counterpart using bake utility provided by Kwiver. The semantics and design guidelines for pipelines is provided by Kwiver.

Plaintext

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#
# Pipe file to test darknet locally
#

# Image configuration
config input 
  exp_file = etc/rc3d_experiment.yml 

# ------------------------------------------------------------------
process exp :: diva_experiment
  experiment_file_name = $CONFIG{input:exp_file}

# ------------------------------------------------------------------
# Darknet instance
process yolo_v2 :: image_object_detector
  :detector:type darknet
  :detector:darknet:net_config  models/virat_auto.inference.cfg
  :detector:darknet:weight_file models/virat_auto_final.weights
  :detector:darknet:class_names models/virat.names
  :detector:darknet:thresh  0.5
  :detector:darknet:hier_thresh  0.5
  :detector:darknet:gpu_index  0
  :detector:darknet:resize_ni  1024
  :detector:darknet:resize_ni  1024

connect from  exp.image to yolo_v2.image

# -----------------------------------------------------------------
# CSV writer 
process sink :: detected_object_output
  :writer:type  csv
  file_name = test.csv

connect from yolo_v2.detected_object_set to sink.detected_object_set

# ------------------------------------------------------------------
# use python scheduler
config _scheduler
  type = pythread_per_process

config _pipeline:_edge
  capacity = 1     # set default edge capacity

Python

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
exp_config = create_diva_experiment_config( exp_file )
exp_process = process_factory.create_process( "diva_experiment", "exp", exp_config ) 

# Create Image Object Detector Process
darknet_config = create_darknet_config( darknet_root, gpu ) 
yolo_process = process_factory.create_process( "image_object_detector", "yolo_v2", \
                                                                    darknet_config )

# Create Detected object set output
detected_output_config = config.empty_config()
detected_output_config.set_value( "writer:type", "csv" )
detected_output_config.set_value( "file_name", csv_path )
detected_output_process = process_factory.create_process( "detected_object_output", "sink",
                                                            detected_output_config )
# Create pipeline out of these
darknet_pipeline = pipeline.Pipeline()
darknet_pipeline.add_process( exp_process )
darknet_pipeline.add_process( yolo_process )
darknet_pipeline.add_process( detected_output_process )

# Create connections
darknet_pipeline.connect( "exp", "image", \
                             "yolo_v2", "image" )
darknet_pipeline.connect( "yolo_v2", "detected_object_set", \
                                  "sink", "detected_object_set" )

C++

1
// Coming Soon

Although, Sprokit implicitly supports multi-threaded pipelines, the pipelines are restricted to a single machine. The framework relies on ZeroMQ for providing distributed pipelines. These pipelines use zeromq_transport_send and zeromq_transport_receive processes to communicate over the network. Currently, ZeroMQ based pipelines support publisher-subscriber pattern with multiple publishers and subscribers.

ZeroMQ Sender

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#
# Pipe file for input
#

# Image configuration
config input 
 exp_file = etc/video_experiment.yml 


# ------------------------------------------------------------------
process exp :: diva_experiment
  experiment_file_name = $CONFIG{input:exp_file}

  
# ------------------------------------------------------------------
process protobuf_ser :: serializer
  serialization_type = protobuf 

connect from exp.timestamp to protobuf_ser.timg/timestamp
connect from exp.image     to protobuf_ser.timg/image
connect from exp.file_name to protobuf_ser.timg/file_name

process zmq_timg :: zmq_transport_send
  port = 5560
  expected_subscribers=1

connect from protobuf_ser.timg to zmq_timg.serialized_message


config _scheduler
  type = pythread_per_process

config _pipeline:_edge
  capacity = 1     # set default edge capacity

ZeroMQ Receiver

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#
# Pipe file for darkent 
#

process zmq :: zmq_transport_receive
  port = 5560
  num_publishers = 1

# --------------------------------------------------
process dser :: deserializer
        serialization_type = protobuf 

connect from zmq.serialized_message to dser.timg

# ---------------------------------------------------
# Darknet instance
process yolo_v2 :: image_object_detector
  :detector:type darknet
  :detector:darknet:net_config  models/virat_auto.inference.cfg
  :detector:darknet:weight_file models/virat_auto_final.weights
  :detector:darknet:class_names models/virat.names
  :detector:darknet:thresh  0.5
  :detector:darknet:hier_thresh  0.5
  :detector:darknet:gpu_index  0
  :detector:darknet:resize_ni  1024
  :detector:darknet:resize_ni  1024

connect from  dser.timg.image to yolo_v2.image

# ---------------------------------------------------
# CSV writer 
process sink :: detected_object_output
  :writer:type  csv
  file_name = test.csv

connect from yolo_v2.detected_object_set to sink.detected_object_set

# ------------------------------------------------------------------
# use python scheduler
config _scheduler
  type = pythread_per_process

config _pipeline:_edge
  capacity = 1     # set default edge capacity

Every algorithm present in DIVA has a dedicated local and ZeroMQ pipeline to replicate the offline behavior of the algorithm in an online enviornment on a single system or on multiple systems. These pipelines have been pictorially documented below for the ease of understanding.

Local Pipelines

Object Detection

strict digraph "unnamed" {
clusterrank=local;

subgraph "cluster_exp" {
color=white;style=filled;fillcolor=white;

"exp_main" [label="exp\n:: diva_experiment",shape=ellipse,rank=same];


"exp_output_file_name" [label="file_name\n:: kwiver:file_name",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_file_name" [arrowhead=none,color=black];
"exp_output_image" [label="image\n:: kwiver:image",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_image" [arrowhead=none,color=black];
"exp_output_timestamp" [label="timestamp\n:: kwiver:timestamp",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_timestamp" [arrowhead=none,color=black];

}

subgraph "cluster_sink" {
color=white;style=filled;fillcolor=white;

"sink_main" [label="sink\n:: detected_object_output",shape=ellipse,rank=same];

"sink_input_detected_object_set" [label="detected_object_set\n:: kwiver:detected_object_set",shape=house,rank=same,fontsize=10];
"sink_input_detected_object_set" -> "sink_main" [arrowhead=none,color=black];
"sink_input_image_file_name" [label="image_file_name\n:: kwiver:file_name",shape=house,rank=same,fontsize=10];
"sink_input_image_file_name" -> "sink_main" [arrowhead=none,color=black];


}

subgraph "cluster_yolo_v2" {
color=white;style=filled;fillcolor=white;

"yolo_v2_main" [label="yolo_v2\n:: image_object_detector",shape=ellipse,rank=same];

"yolo_v2_input_image" [label="image\n:: kwiver:image",shape=house,rank=same,fontsize=10];
"yolo_v2_input_image" -> "yolo_v2_main" [arrowhead=none,color=black];

"yolo_v2_output_detected_object_set" [label="detected_object_set\n:: kwiver:detected_object_set",shape=invhouse,rank=same,fontsize=10];
"yolo_v2_main" -> "yolo_v2_output_detected_object_set" [arrowhead=none,color=black];

}

"exp_output_image" -> "yolo_v2_input_image" [minlen=1,color=black,weight=1];
"yolo_v2_output_detected_object_set" -> "sink_input_detected_object_set" [minlen=1,color=black,weight=1];

}

Optical Flow

strict digraph "unnamed" {
clusterrank=local;

subgraph "cluster_exp" {
color=white;style=filled;fillcolor=white;

"exp_main" [label="exp\n:: diva_experiment",shape=ellipse,rank=same];


"exp_output_file_name" [label="file_name\n:: kwiver:file_name",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_file_name" [arrowhead=none,color=black];
"exp_output_image" [label="image\n:: kwiver:image",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_image" [arrowhead=none,color=black];
"exp_output_timestamp" [label="timestamp\n:: kwiver:timestamp",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_timestamp" [arrowhead=none,color=black];

}

subgraph "cluster_flow" {
color=white;style=filled;fillcolor=white;

"flow_main" [label="flow\n:: optical_flow",shape=ellipse,rank=same];

"flow_input_image" [label="image\n:: kwiver:image",shape=house,rank=same,fontsize=10];
"flow_input_image" -> "flow_main" [arrowhead=none,color=black];
"flow_input_timestamp" [label="timestamp\n:: kwiver:timestamp",shape=house,rank=same,fontsize=10];
"flow_input_timestamp" -> "flow_main" [arrowhead=none,color=black];

"flow_output_image" [label="image\n:: kwiver:image",shape=invhouse,rank=same,fontsize=10];
"flow_main" -> "flow_output_image" [arrowhead=none,color=black];

}

subgraph "cluster_writer" {
color=white;style=filled;fillcolor=white;

"writer_main" [label="writer\n:: image_writer",shape=ellipse,rank=same];

"writer_input_image" [label="image\n:: kwiver:image",shape=house,rank=same,fontsize=10];
"writer_input_image" -> "writer_main" [arrowhead=none,color=black];
"writer_input_timestamp" [label="timestamp\n:: kwiver:timestamp",shape=house,rank=same,fontsize=10];
"writer_input_timestamp" -> "writer_main" [arrowhead=none,color=black];


}

"exp_output_image" -> "flow_input_image" [minlen=1,color=black,weight=1];
"exp_output_timestamp" -> "flow_input_timestamp" [minlen=1,color=black,weight=1];
"flow_output_image" -> "writer_input_image" [minlen=1,color=black,weight=1];

}

ZeroMQ Pipelines

Image Sender

strict digraph "unnamed" {
clusterrank=local;

subgraph "cluster_exp" {
color=white;style=filled;fillcolor=white;

"exp_main" [label="exp\n:: diva_experiment",shape=ellipse,rank=same];


"exp_output_file_name" [label="file_name\n:: kwiver:file_name",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_file_name" [arrowhead=none,color=black];
"exp_output_image" [label="image\n:: kwiver:image",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_image" [arrowhead=none,color=black];
"exp_output_timestamp" [label="timestamp\n:: kwiver:timestamp",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_timestamp" [arrowhead=none,color=black];

}

subgraph "cluster_protobuf_ser" {
color=white;style=filled;fillcolor=white;

"protobuf_ser_main" [label="protobuf_ser\n:: serializer",shape=ellipse,rank=same];

"protobuf_ser_input_timg/file_name" [label="timg/file_name\n:: _flow_dependent/",shape=house,rank=same,fontsize=10];
"protobuf_ser_input_timg/file_name" -> "protobuf_ser_main" [arrowhead=none,color=black];
"protobuf_ser_input_timg/image" [label="timg/image\n:: _flow_dependent/",shape=house,rank=same,fontsize=10];
"protobuf_ser_input_timg/image" -> "protobuf_ser_main" [arrowhead=none,color=black];
"protobuf_ser_input_timg/timestamp" [label="timg/timestamp\n:: _flow_dependent/",shape=house,rank=same,fontsize=10];
"protobuf_ser_input_timg/timestamp" -> "protobuf_ser_main" [arrowhead=none,color=black];

"protobuf_ser_output_timg" [label="timg\n:: kwiver:serialized_message",shape=invhouse,rank=same,fontsize=10];
"protobuf_ser_main" -> "protobuf_ser_output_timg" [arrowhead=none,color=black];

}

subgraph "cluster_zmq_timg" {
color=white;style=filled;fillcolor=white;

"zmq_timg_main" [label="zmq_timg\n:: zmq_transport_send",shape=ellipse,rank=same];

"zmq_timg_input_serialized_message" [label="serialized_message\n:: kwiver:serialized_message",shape=house,rank=same,fontsize=10];
"zmq_timg_input_serialized_message" -> "zmq_timg_main" [arrowhead=none,color=black];


}

"exp_output_file_name" -> "protobuf_ser_input_timg/file_name" [minlen=1,color=black,weight=1];
"exp_output_image" -> "protobuf_ser_input_timg/image" [minlen=1,color=black,weight=1];
"exp_output_timestamp" -> "protobuf_ser_input_timg/timestamp" [minlen=1,color=black,weight=1];
"protobuf_ser_output_timg" -> "zmq_timg_input_serialized_message" [minlen=1,color=black,weight=1];

}