Timestamps with Python’s multiprocessing library

Small implementation of running three separate processes that can communicate with each other through messages. Each process has its own internal counter that will be updated with each event.

The following script is an example of how the multiprocessing scripts are running my flaON smart home system.

This script, as an example, will print a line for each event together with the updated internal counter and the time on the machine running the processes.

from multiprocessing import Process, Pipe
from os import getpid
from datetime import datetime

Then create some helper functions. The first one simply prints the local Lamport timestamp and the actual time on the machine executing the processes. Note that printing the ‘actual’ time doesn’t make sense in a real distributed system, since the local clocks won’t be synchronized with each other.

def local_time(counter):
	return ' (LAMPORT_TIME={}, LOCAL_TIME={})'.format(counter, datetime.now())

The second one calculates the new timestamp when a process receives a message. The function takes the maximum of the received timestamp and its local counter, and increments it with one.

def calc_recv_timestamp(recv_time_stamp, counter):
    return max(recv_time_stamp, counter) + 1

Next, we need to create a function for every event that may occur. In this example they are three events: event (any local event), message sent and message received. To make our code easy to read, the event functions will return the updated timestamp for the process where the event takes place.

The "event" event takes the local counter and the process id (pid), increments the counter by one, prints a line so we know the event took place and returns the incremented counter.

def event(pid, counter):
	counter += 1
	print('Something happened in {} !'.\
		format(pid) + local_time(counter))
	return counter

The send_message event also takes the pid and counter as input, but additionally requires a pipe. A Pipe is an object from the multiprocessing library that represents a two-way connection between two processes. Every pipe consists of two connection objects, one in each direction. To send or receive a message we need to call the send or recv function on these connection object.

The send_message event first increments the counter by one, then sends an actual message (content is not important here) and its incremented timestamp, and prints a short statement including the new local Lamport time and the actual time on the machine.

def send_message(pipe, pid, counter):
	counter += 1
	pipe.send(('Empty shell', counter))
	print('Message sent from ' + str(pid) + local_time(counter))
	return counter

The recv_message event takes the same three arguments as send_message. It receives both the actual message and the timestamp by invoking the recv function on the pipe. Then it calculates the new timestamp with our previously created calc_recv_timestamp function, and prints a line including the updated counter and the actual time on the machine.

def recv_message(pipe, pid, counter):
	message, timestamp = pipe.recv()
	counter = calc_recv_timestamp(timestamp, counter)
	print('Message received at ' + str(pid)  + local_time(counter))
	return counter

All set up to start creating the definitions of our three processes. Every process starts with getting its unique process id (this is the actual process id of the process running on our machine) and setting its own counter to 0. Then the counter gets updated by invoking the different event functions and passing the returned value to the counter.

def process_one(pipe12):
	pid = getpid()
	counter = 0
	counter = event(pid, counter)
	counter = send_message(pipe12, pid, counter)
	counter  = event(pid, counter)
	counter = recv_message(pipe12, pid, counter)
	counter  = event(pid, counter)

def process_two(pipe21, pipe23):
	pid = getpid()
	counter = 0
	counter = recv_message(pipe21, pid, counter)
	counter = send_message(pipe21, pid, counter)
	counter = send_message(pipe23, pid, counter)
	counter = recv_message(pipe23, pid, counter)

def process_three(pipe32):
	pid = getpid()
	counter = 0
	counter = recv_message(pipe32, pid, counter)
	counter = send_message(pipe32, pid, counter)

So far nothing happens if you execute the code. This is because none of the processes are actually created, let alone started. In the main part of this script we will create the two pipes (Pipe()) and three processes (Process()), needed to run our script successfully. To start the processes, we need to call start and join on each process. Join assures us that all processes will be completed before quitting.

if __name__ == '__main__':
	oneandtwo, twoandone = Pipe()
	twoandthree, threeandtwo = Pipe()
	
	process1 = Process(target=process_one, args=(oneandtwo,))
	process2 = Process(target=process_two, args=(twoandone, twoandthree))
	process3 = Process(target=process_three, args=(threeandtwo,))
	
	process1.start()
	process2.start()
	process3.start()
	
	process1.join()
	process2.join()
	process3.join()

Each event has indeed printed a line, including the type of event and the updated Lamport timestamp. Comparing the timestamps with stamps in this example, we'll notice they all agree with each other. New examples can be tested by drawing a timeline and change the process definitions accordingly.