Pipelines

Pipelines are used to provide a plug and play environment that can be used for quick prototyping and experimentation. Addtionally, port checks implemented by the framework ensures consistency in data as the processes are connected to work in sync with each other.

Pipeline is used to express the data flow in a system. The data flow is the input for the workhorse of framework, Sprokit. Sprokit handles every aspect of execution for the pipeline. This includes

  1. Finding registered processes (including out-of-box processes from Kwiver).

  2. Enforcing port checks based on the port_traits declared by the process.

  3. Scheduling and synchronizing the processes in a pipeline.

  4. Providing introspection capbabilities during the development for optimization.

  5. Providing tools for execution and visualization of the pipeline.

Pipelines can be written as plaintext file or can be embedded in Python or C++ program. To execute the plaintext file, pipeline_runner is provided by Kwiver. Furthermore, plaintext file can be translated into their Python/C++ counterpart using bake utility provided by Kwiver. The semantics and design guidelines for pipelines is provided by Kwiver.

Plaintext

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#
# Pipe file to test darknet locally
#

# Image configuration
config input 
  exp_file = etc/rc3d_experiment.yml 

# ------------------------------------------------------------------
process exp :: diva_experiment
  experiment_file_name = $CONFIG{input:exp_file}

# ------------------------------------------------------------------
# Darknet instance
process yolo_v2 :: image_object_detector
  :detector:type darknet
  :detector:darknet:net_config  models/virat_auto.inference.cfg
  :detector:darknet:weight_file models/virat_auto_final.weights
  :detector:darknet:class_names models/virat.names
  :detector:darknet:thresh  0.5
  :detector:darknet:hier_thresh  0.5
  :detector:darknet:gpu_index  0
  :detector:darknet:resize_ni  1024
  :detector:darknet:resize_ni  1024

connect from  exp.image to yolo_v2.image

# -----------------------------------------------------------------
# CSV writer 
process sink :: detected_object_output
  :writer:type  csv
  file_name = test.csv

connect from yolo_v2.detected_object_set to sink.detected_object_set

# ------------------------------------------------------------------
# use python scheduler
config _scheduler
  type = pythread_per_process

config _pipeline:_edge
  capacity = 1     # set default edge capacity

Python

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
exp_config = create_diva_experiment_config( exp_file )
exp_process = process_factory.create_process( "diva_experiment", "exp", exp_config ) 

# Create Image Object Detector Process
darknet_config = create_darknet_config( darknet_root, gpu ) 
yolo_process = process_factory.create_process( "image_object_detector", "yolo_v2", \
                                                                    darknet_config )

# Create Detected object set output
detected_output_config = config.empty_config()
detected_output_config.set_value( "writer:type", "csv" )
detected_output_config.set_value( "file_name", csv_path )
detected_output_process = process_factory.create_process( "detected_object_output", "sink",
                                                            detected_output_config )
# Create pipeline out of these
darknet_pipeline = pipeline.Pipeline()
darknet_pipeline.add_process( exp_process )
darknet_pipeline.add_process( yolo_process )
darknet_pipeline.add_process( detected_output_process )

# Create connections
darknet_pipeline.connect( "exp", "image", \
                             "yolo_v2", "image" )
darknet_pipeline.connect( "yolo_v2", "detected_object_set", \
                                  "sink", "detected_object_set" )

C++

1
// Coming Soon

The DIVA framework also provides processes (via Kwiver) for mapping annotations between cameras and rendering the results. The following example pipeline demonstrates how to ingest KPF formatted annotations, map them between two cameras, and draw the resulting annotations.

Plaintext

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# This example pipeline demonstrates how take annotations (in KPF
# format) and the corresponding video, and render the annotations onto
# the video frames

# ================================================================
process video
  :: video_input

  # File path to the input video
  video_filename = G336.avi
  video_reader:type = ffmpeg

# ================================================================
process kpf_in
  :: detected_object_input

  # KPF formatted annotation file path
  file_name = G336.geom.yml
  reader:type = kpf_input

# ================================================================
process draw
  :: draw_detected_object_set

  draw_algo:type = ocv
  draw_algo:ocv:default_line_thickness = 3

# ================================================================
process imagewriter
  :: image_writer

  # Writes out the annotated video frames from the destination
  # camera, starting from image00001.jpg.  Note that frames will
  # only be written out while there are annotations to consume
  file_name_template = G336_output_frames/image%05d.jpg
  image_writer:type = ocv

# ================================================================
# connections
connect from video.image
        to draw.image

connect from kpf_in.detected_object_set
        to draw.detected_object_set

connect from draw.image
        to imagewriter.image

# -- end of file --

Although, Sprokit implicitly supports multi-threaded pipelines, the pipelines are restricted to a single machine. The framework relies on ZeroMQ for providing distributed pipelines. These pipelines use zeromq_transport_send and zeromq_transport_receive processes to communicate over the network. Currently, ZeroMQ based pipelines support publisher-subscriber pattern with multiple publishers and subscribers.

ZeroMQ Sender

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#
# Pipe file for input
#

# Image configuration
config input 
 exp_file = etc/video_experiment.yml 


# ------------------------------------------------------------------
process exp :: diva_experiment
  experiment_file_name = $CONFIG{input:exp_file}

  
# ------------------------------------------------------------------
process protobuf_ser :: serializer
  serialization_type = protobuf 

connect from exp.timestamp to protobuf_ser.timg/timestamp
connect from exp.image     to protobuf_ser.timg/image
connect from exp.file_name to protobuf_ser.timg/file_name

process zmq_timg :: zmq_transport_send
  port = 5560
  expected_subscribers=1

connect from protobuf_ser.timg to zmq_timg.serialized_message


config _scheduler
  type = pythread_per_process

config _pipeline:_edge
  capacity = 1     # set default edge capacity

ZeroMQ Receiver

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#
# Pipe file for darkent 
#

process zmq :: zmq_transport_receive
  port = 5560
  num_publishers = 1

# --------------------------------------------------
process dser :: deserializer
        serialization_type = protobuf 

connect from zmq.serialized_message to dser.timg

# ---------------------------------------------------
# Darknet instance
process yolo_v2 :: image_object_detector
  :detector:type darknet
  :detector:darknet:net_config  models/virat_auto.inference.cfg
  :detector:darknet:weight_file models/virat_auto_final.weights
  :detector:darknet:class_names models/virat.names
  :detector:darknet:thresh  0.5
  :detector:darknet:hier_thresh  0.5
  :detector:darknet:gpu_index  0
  :detector:darknet:resize_ni  1024
  :detector:darknet:resize_ni  1024

connect from  dser.timg.image to yolo_v2.image

# ---------------------------------------------------
# CSV writer 
process sink :: detected_object_output
  :writer:type  csv
  file_name = test.csv

connect from yolo_v2.detected_object_set to sink.detected_object_set

# ------------------------------------------------------------------
# use python scheduler
config _scheduler
  type = pythread_per_process

config _pipeline:_edge
  capacity = 1     # set default edge capacity

Every algorithm present in DIVA has a dedicated local and ZeroMQ pipeline to replicate the offline behavior of the algorithm in an online enviornment on a single system or on multiple systems. These pipelines have been pictorially documented below for the ease of understanding.

Local Pipelines

Object Detection

strict digraph "unnamed" {
clusterrank=local;

subgraph "cluster_exp" {
color=white;style=filled;fillcolor=white;

"exp_main" [label="exp\n:: diva_experiment",shape=ellipse,rank=same];


"exp_output_file_name" [label="file_name\n:: kwiver:file_name",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_file_name" [arrowhead=none,color=black];
"exp_output_image" [label="image\n:: kwiver:image",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_image" [arrowhead=none,color=black];
"exp_output_timestamp" [label="timestamp\n:: kwiver:timestamp",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_timestamp" [arrowhead=none,color=black];

}

subgraph "cluster_sink" {
color=white;style=filled;fillcolor=white;

"sink_main" [label="sink\n:: detected_object_output",shape=ellipse,rank=same];

"sink_input_detected_object_set" [label="detected_object_set\n:: kwiver:detected_object_set",shape=house,rank=same,fontsize=10];
"sink_input_detected_object_set" -> "sink_main" [arrowhead=none,color=black];
"sink_input_image_file_name" [label="image_file_name\n:: kwiver:file_name",shape=house,rank=same,fontsize=10];
"sink_input_image_file_name" -> "sink_main" [arrowhead=none,color=black];


}

subgraph "cluster_yolo_v2" {
color=white;style=filled;fillcolor=white;

"yolo_v2_main" [label="yolo_v2\n:: image_object_detector",shape=ellipse,rank=same];

"yolo_v2_input_image" [label="image\n:: kwiver:image",shape=house,rank=same,fontsize=10];
"yolo_v2_input_image" -> "yolo_v2_main" [arrowhead=none,color=black];

"yolo_v2_output_detected_object_set" [label="detected_object_set\n:: kwiver:detected_object_set",shape=invhouse,rank=same,fontsize=10];
"yolo_v2_main" -> "yolo_v2_output_detected_object_set" [arrowhead=none,color=black];

}

"exp_output_image" -> "yolo_v2_input_image" [minlen=1,color=black,weight=1];
"yolo_v2_output_detected_object_set" -> "sink_input_detected_object_set" [minlen=1,color=black,weight=1];

}

Optical Flow

strict digraph "unnamed" {
clusterrank=local;

subgraph "cluster_exp" {
color=white;style=filled;fillcolor=white;

"exp_main" [label="exp\n:: diva_experiment",shape=ellipse,rank=same];


"exp_output_file_name" [label="file_name\n:: kwiver:file_name",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_file_name" [arrowhead=none,color=black];
"exp_output_image" [label="image\n:: kwiver:image",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_image" [arrowhead=none,color=black];
"exp_output_timestamp" [label="timestamp\n:: kwiver:timestamp",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_timestamp" [arrowhead=none,color=black];

}

subgraph "cluster_flow" {
color=white;style=filled;fillcolor=white;

"flow_main" [label="flow\n:: optical_flow",shape=ellipse,rank=same];

"flow_input_image" [label="image\n:: kwiver:image",shape=house,rank=same,fontsize=10];
"flow_input_image" -> "flow_main" [arrowhead=none,color=black];
"flow_input_timestamp" [label="timestamp\n:: kwiver:timestamp",shape=house,rank=same,fontsize=10];
"flow_input_timestamp" -> "flow_main" [arrowhead=none,color=black];

"flow_output_image" [label="image\n:: kwiver:image",shape=invhouse,rank=same,fontsize=10];
"flow_main" -> "flow_output_image" [arrowhead=none,color=black];

}

subgraph "cluster_writer" {
color=white;style=filled;fillcolor=white;

"writer_main" [label="writer\n:: image_writer",shape=ellipse,rank=same];

"writer_input_image" [label="image\n:: kwiver:image",shape=house,rank=same,fontsize=10];
"writer_input_image" -> "writer_main" [arrowhead=none,color=black];
"writer_input_timestamp" [label="timestamp\n:: kwiver:timestamp",shape=house,rank=same,fontsize=10];
"writer_input_timestamp" -> "writer_main" [arrowhead=none,color=black];


}

"exp_output_image" -> "flow_input_image" [minlen=1,color=black,weight=1];
"exp_output_timestamp" -> "flow_input_timestamp" [minlen=1,color=black,weight=1];
"flow_output_image" -> "writer_input_image" [minlen=1,color=black,weight=1];

}

Annotation Transfer and Visualization

strict digraph "unnamed" {
clusterrank=local;

subgraph "cluster_frame_list_input" {
color=white;style=filled;fillcolor=white;fontsize=10;rank=same;

"frame_list_input_main" [label="frames\n::frame_list_input",shape=ellipse];

"frame_list_file" [label="image_list_file",shape=box];
"frame_list_path" [label="path",shape=box];
"frame_list_image_reader_type" [label="image_reader:type",shape=box];
"frame_list_image_output" [label="image",shape=invhouse];

{ "frame_list_file" "frame_list_path" "frame_list_image_reader_type" } -> "frame_list_input_main" [arrowhead=none];
"frame_list_input_main" -> "frame_list_image_output" [arrowhead=none];
}

subgraph "cluster_cam2cam" {
color=white;style=filled;fillcolor=white;fontsize=10;rank=same;

"cam2cam_main" [label="camera2camera\n::detected_object_filter",shape=ellipse];

"cam2cam_src_cam" [label="src_camera_krtd_file_name",shape=box];
"cam2cam_dest_cam" [label="dest_camera_krtd_file_name",shape=box];
"cam2cam_detected_obj_set_in" [label="detected_object_set",shape=house]
"cam2cam_detected_obj_set_out" [label="detected_object_set",shape=invhouse];

{ "cam2cam_src_cam" "cam2cam_dest_cam" "cam2cam_detected_obj_set_in" } -> "cam2cam_main" [arrowhead=none];
"cam2cam_main" -> "cam2cam_detected_obj_set_out" [arrowhead=none];
}

subgraph "cluster_kpf_in" {
color=white;style=filled;fillcolor=white;fontsize=10;rank=same;

"kpf_in_main" [label="kpf_in\n::detected_object_input",shape=ellipse];

"kpf_in_file" [label="file_name",shape=box]
"kpf_in_reader_type" [label="reader:type",shape=box]
"kpf_in_detected_obj_set_out" [label="detected_object_set",shape=invhouse];

{ "kpf_in_file" "kpf_in_reader_type" } -> "kpf_in_main" [arrowhead=none];
"kpf_in_main" -> "kpf_in_detected_obj_set_out" [arrowhead=none];
}

subgraph "cluster_shift" {
color=white;style=filled;fillcolor=white;fontsize=10;rank=same;

"shift_main" [label="shift\n::shift_detected_object_set",shape=ellipse];

"shift_offset" [label="offset",shape=box];
"shift_detected_obj_set_in" [label="detected_object_set",shape=house]
"shift_detected_obj_set_out" [label="detected_object_set",shape=invhouse];

{ "shift_offset" "shift_detected_obj_set_in" } -> "shift_main" [arrowhead=none];
"shift_main" -> "shift_detected_obj_set_out" [arrowhead=none];
}

subgraph "cluster_draw" {
color=white;style=filled;fillcolor=white;fontsize=10;rank=same;

"draw_main" [label="draw\n::draw_detected_object_set",shape=ellipse];

"draw_image_in" [label="image",shape=house]
"draw_detected_obj_set_in" [label="detected_object_set",shape=house]
"draw_image_out" [label="image",shape=invhouse];

{ "draw_image_in" "draw_detected_obj_set_in" } -> "draw_main" [arrowhead=none];
"draw_main" -> "draw_image_out" [arrowhead=none];
}

subgraph "cluster_csv_out" {
color=white;style=filled;fillcolor=white;fontsize=10;rank=same;

"csv_out_main" [label="csv_out\n::detected_object_output",shape=ellipse];

"csv_out_file" [label="file_name",shape=box];
"csv_out_type" [label="writer:type",shape=box];
"csv_out_detected_obj_set_in" [label="detected_object_set",shape=house];
"csv_file_out" [label="csv file",shape=invtrapezium];

{ "csv_out_file" "csv_out_type" "csv_out_detected_obj_set_in" } -> "csv_out_main" [arrowhead=none];
"csv_out_main" -> "csv_file_out" [arrowhead=none];
}

subgraph "cluster_image_write" {
color=white;style=filled;fillcolor=white;fontsize=10;rank=same;

"image_write_main" [label="imagewriter\n::image_writer",shape=ellipse];

"image_write_file_template" [label="file_name_template",shape=box];
"image_write_type" [label="writer:type",shape=box];
"image_write_image_in" [label="image",shape=house];
"image_write_images_out" [label="images",shape=invtrapezium];

{ "image_write_file_template" "image_write_type" "image_write_image_in" } -> "image_write_main" [arrowhead=none];
"image_write_main" -> "image_write_images_out" [arrowhead=none];
}

"kpf_in_detected_obj_set_out" -> "shift_detected_obj_set_in" [minlen=1,weight=1];
"shift_detected_obj_set_out" -> "cam2cam_detected_obj_set_in" [minlen=1,weight=1];
"cam2cam_detected_obj_set_out" -> "draw_detected_obj_set_in" [minlen=1,weight=1];
"frame_list_image_output" -> "draw_image_in" [minlen=1,weight=1];
"cam2cam_detected_obj_set_out" -> "csv_out_detected_obj_set_in" [minlen=1,weight=1];
"draw_image_out" -> "image_write_image_in" [minlen=1,weight=1];
}

ZeroMQ Pipelines

Image Sender

strict digraph "unnamed" {
clusterrank=local;

subgraph "cluster_exp" {
color=white;style=filled;fillcolor=white;

"exp_main" [label="exp\n:: diva_experiment",shape=ellipse,rank=same];


"exp_output_file_name" [label="file_name\n:: kwiver:file_name",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_file_name" [arrowhead=none,color=black];
"exp_output_image" [label="image\n:: kwiver:image",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_image" [arrowhead=none,color=black];
"exp_output_timestamp" [label="timestamp\n:: kwiver:timestamp",shape=invhouse,rank=same,fontsize=10];
"exp_main" -> "exp_output_timestamp" [arrowhead=none,color=black];

}

subgraph "cluster_protobuf_ser" {
color=white;style=filled;fillcolor=white;

"protobuf_ser_main" [label="protobuf_ser\n:: serializer",shape=ellipse,rank=same];

"protobuf_ser_input_timg/file_name" [label="timg/file_name\n:: _flow_dependent/",shape=house,rank=same,fontsize=10];
"protobuf_ser_input_timg/file_name" -> "protobuf_ser_main" [arrowhead=none,color=black];
"protobuf_ser_input_timg/image" [label="timg/image\n:: _flow_dependent/",shape=house,rank=same,fontsize=10];
"protobuf_ser_input_timg/image" -> "protobuf_ser_main" [arrowhead=none,color=black];
"protobuf_ser_input_timg/timestamp" [label="timg/timestamp\n:: _flow_dependent/",shape=house,rank=same,fontsize=10];
"protobuf_ser_input_timg/timestamp" -> "protobuf_ser_main" [arrowhead=none,color=black];

"protobuf_ser_output_timg" [label="timg\n:: kwiver:serialized_message",shape=invhouse,rank=same,fontsize=10];
"protobuf_ser_main" -> "protobuf_ser_output_timg" [arrowhead=none,color=black];

}

subgraph "cluster_zmq_timg" {
color=white;style=filled;fillcolor=white;

"zmq_timg_main" [label="zmq_timg\n:: zmq_transport_send",shape=ellipse,rank=same];

"zmq_timg_input_serialized_message" [label="serialized_message\n:: kwiver:serialized_message",shape=house,rank=same,fontsize=10];
"zmq_timg_input_serialized_message" -> "zmq_timg_main" [arrowhead=none,color=black];


}

"exp_output_file_name" -> "protobuf_ser_input_timg/file_name" [minlen=1,color=black,weight=1];
"exp_output_image" -> "protobuf_ser_input_timg/image" [minlen=1,color=black,weight=1];
"exp_output_timestamp" -> "protobuf_ser_input_timg/timestamp" [minlen=1,color=black,weight=1];
"protobuf_ser_output_timg" -> "zmq_timg_input_serialized_message" [minlen=1,color=black,weight=1];

}