CS 3700 - Networks and Distributed Systems

Return to course homepage

Project 5: Distributed, Replicated Key-Value Store

This project is due at 11:59pm on December 7, 2017.


In his project, you will build a (relatively) simple, distributed, replicated key-value datastore. A key-value datastore is a very simple type of database that supports two API calls from clients: put(key, value) and get(key). The former API allows a client application to store a key-value pair in the database, while the latter API allows a client to retrieve a previously stored value by supplying its key. Real-world examples of distributed key-value datastores include memcached, Redis, DynamoDB, etc.

Of course, it would be simple to build a key-value store if it was a single process. However, your system must be replicated and support strong consistency guarantees. Thus, you will be implementing a simplified version of the Raft consensus protocol. Your datastore will be run multiple times, in parallel, and it will use the Raft protocol to maintain consensus among the replicas.

Your datastore will be tested for both correctness and performance. We will provide a testbed for your datastore that will simulate clients who execute put() and get() commands, as well as an unreliable network that can drop packets or make hosts unavailable. Part of your grade will come from the overhead your system has (i.e., fewer packets will result in a higher score), while another part will depend on the speed at which your datastore answers client queries (i.e. what is the query latency). Your results will be compared to your classmates' via a leaderboard.

Language and Libraries

You may write your code in whatever language you choose, as long as your code compiles and runs on unmodified CCIS Linux machines on the command line. Do not use libraries that are not installed by default on the CCIS Linux machines. Similarly, your code must compile and run on the command line. You may use IDEs (e.g. Eclipse) during development, but do not turn in your IDE project without a Makefile. Make sure you code has no dependencies on your IDE.

You may not use libraries or modules that implement consensus protocols. This includes any library that implements Raft, Paxos, Replicated View-state, or similar protocols. Obviously, you cannot use any libraries or software packages that implement a replicated key-value datastore. For example, your program cannot be a thin wrapper around memcached, etc. You may use libraries or modules that implement local database storage (e.g. SQLite, BerkeleyDB, LevelDB) if you want to use them for persistent storage within each replica. If you have any questions about whether a particular library or module is allowed, post on Piazza.

Your Program

For this project, you will submit one program named 3700kvstore that implements the replicated datastore. You may use any language of your choice, and we will give you basic starter code in python. Keep in mind that you are writing a program that will be run multiple times, in parallel, to form a distributed system. Thus, 3700kvstore will bear some conceptual similarities to your 3700bridge program from Project 2.

If you use C or any other compiled language, your executable should be named 3700kvstore. If you use an interpreted language, your script should be called 3700kvstore and be marked as executable. If you use a virtual machine-based language (like Java or C#), you must write a brief Bash shell script, named 3700kvstore, that conforms to the input syntax given below and then launches your program using whatever incantations are necessary. For example, if you write your solution in Java, your Bash script might resemble

#!/usr/bin/perl -w
$args = join(' ', @ARGV);
print 'java -jar 3700kvstore.jar $args';
Or, if you use python, your script might start with
import foo from bar
and should be marked executable.

Starter Code

Very basic starter code for the assignment in Python, as well as the testbed scripts, are available in /course/cs3700f17/code/project5. To get started, you should copy this directory into your own local directory (i.e., cp -r /course/cs3700f17/code/project5 ~/), since you will need the run.py and test.py scripts, as well as the example configuration files.

The starter code provides a bare-bones implementation of a datastore that simply connects to the LAN and broadcasts a "no-op" message once every second. You may use this code as a basis for your project if you wish, but it is strongly recommended that you do not do so unless you are comfortable with Python.

Testing Your Code

In order to evaluate your replicated datastore, we have provided a simulated test environment. The simulator will create an emulated network and all necessary sockets, execute several copies of your datastore with the appropriate command line arguments, route messages between the datastore replicas, and generate requests from clients. This script is included in the starter code, and you can run it by executing
./run.py <config-file>
where <config-file> is the configuration file that describes the test configuration you would like to use. Note that you will not need to modify the run script, or parse the config files (the run script parses the config files). You may create custom configs to test your code under different scenarios if you want to.

Config File Format

The configuration file that you pass to ./run.py contains a number of parameters that control the simulation. The file is formatted in JSON and has the following elements For example, a simple configuration with no events and a read-heavy workload might look like the following
    "lifetime": 30,
    "replicas": 5,
    "requests": 100,
    "mix": 0.9
and a more complex configuration with events and a lossy network might be
    "lifetime": 30,
    "replicas": 5,
    "requests": 100,
    "mix": 0.7,
    "drops": 0.1,
    "events": [{"type": "kill_non_leader", "time": 10},
               {"type": "kill_leader", "time": 20}]

./run.py Output

The ./run.py script will output any errors it encounters during the simulation, including malformed messages, messages to unknown destinations, replicas that unexpectedly quit, etc. Once the simulation completes, ./run.py prints some statistics about your datastore's performance:
bash$ ./run.py config.json
Simulation finished.
Leaders: 0 1 3 4
Replicas that died/were killed: 0/1
Total messages sent: 6730
Total messages dropped: 183
Total client get()/put() requests: 60/40
Total duplicate responses: 11
Total unanswered get()/put() requests: 4/3
Total redirects: 9
Total get()/put() failures: 15/31
Total get() with incorrect response: 60
Average/mean query latency: 1.2sec/0.9sec
Ideally, you would like all get() and put() requests to succeed without failing and for them to have low latency. Obviously, if your system is returning incorrect values to get() requests then your datastore has consistency issues. Furthermore, you would like the total number of packets to be as low as possible, i.e. the overhead of your datastore on the network should be low.

Testing Script

Additionally, we have included a basic test script that runs your code under a variety of different configurations and also checks your code's compatibility with the grading script. If your code fails in the test script we provide, you can be assured that it will fare poorly when run under the grading script. To run the test script, simply type
bash$ ./test.py
Basic tests (5 replicas, 30 seconds, 100 requests):
	No drops, no failures, 80% read		[PASS]
	No drops, no failures, 60% read		[PASS]
	No drops, no failures, 40% read		[PASS]
	No drops, no failures, 20% read		[PASS]
Unreliable network tests (5 replicas, 30 seconds, 150 requests):
	10% drops, no failures, 80% read	[FAIL]
This will run your datastore on a number of configurations, and will output whether your program performs sufficiently. If you wish to run one of the tests manually, you can do so with
bash$ ./run.py test-whatever.json

Message Format

To simplify this project, instead of using real packet formats, we will be sending our data across the wire in JSON (many languages have utilities to encode and decode JSON, and you are welcome to use these libraries). All messages must be encoded as a dictionary, and they must include the following four keys (at a minimum) : The simulator uses src and dst instead of IP addresses in order to route and deliver messages. Furthermore, the simulator supports multicast: if dst is set to "FFFF", the message will be delivered to all replicas (use multicast sparingly, since it is expensive). leader is the ID of the replica that the sender of the message believes is the leader. All messages must include the leader so that the simulator can learn which replica is the leader (otherwise, the simulator would have no way of determining this information).

type describes the type of the message. You may define custom types in order to implement your datastore (and you may add custom keys to the message dictionary in these cases). However, there are several message types that your replicas must support in order to handle requests from clients.

Note that in all of the above cases, the MID in a request must match the MID in the response. For example, the following would be a legal series of requests and responses, where 001A is a client and 0000 and 0001 are replicas:
Request 1     {"src": "001A", "dst": "0001", "leader": "FFFF",
                "type": "get", "MID": "4D61ACF83027", "key": "name"}
Response 1    {"src": "0001", "dst": "001A", "leader": "0000",
               "type": "redirect", "MID": "4D61ACF83027"}
Request 2     {"src": "001A", "dst": "0000", "leader": "0000",
               "type": "get", "MID": "9AB4CE50023", "key": "name"}
Response 2    {"src": "0000", "dst": "001A", "leader": "0000", "type": "ok",
               "MID": "9AB4CE50023", "value": "Christo Wilson"}
Again, you will need to develop additional, custom message types in order to implement the Raft consensus protocol. As long as your messages include the four minimum required fields (src, dst, leader, type), the simulator will ensure that your messages are delivered.

Command Line Specification

The command line syntax for your 3700kvstore program is given below. The simulator will pass parameters to each replica representing (1) the ID of the replica, and (2) the IDs of all other replicas in the system. The syntax for launching your datastore is therefore:
./3700kvstore <your ID> <ID of second replica> [ID3 [ID4 ...]]
For simplicity, all replica IDs are unique four-digit hexadecimal numbers (e.g., 0AA1 or F29A). You will use these IDs as the src and dst in your messages. Clients will also be assigned unique IDs by the simulator.

Connecting to the LAN

We will be using UNIX domain sockets to emulate a LAN. Each of your replicas will connect to a single domain socket (the way a server would connect to a single Ethernet cable). A replica will send and receive all messages over this socket (i.e. messages to/from other replicas, as well as messages to/from clients). Your program should be constantly reading from the socket make sure it receives all messages (they will be buffered if you don't read immediately). The simulator will take care of routing all sent messages to the appropriate destinations; thus, it's okay if you're not intimately familiar with how Domain Sockets work, or with how the simulator works.

Each replica should connect to a Domain Socket named "ID" (no-quotes), where ID is the replica's ID (i.e. the first ID it receives on the command line). We will be using the SOCK_SEQPACKET socket type, which provides a reliable message-oriented stream. Note that unlike Project 2, you do not need to pad the name of the domain sockets with \0.

Exactly how to connect to a UNIX domain socket depends on your programming language. For example, if you were using perl to complete the project, your code for connecting would look like:

use IO::Socket::UNIX;
my $lan = IO::Socket::UNIX->new(
    Peer => "<lan>"
You can then read and write from the $lan variable. In python, your code would look like
from socket import socket, SOCK_SEQPACKET, AF_UNIX
s.connect ('<lan>')
with similar results.

We encourage you to write your code in an event-driven style using select() or poll(). This will keep your code single-threaded and will make debugging your code significantly easier. Alternatively, you can implement your datastore in a threaded model, but expect it to be significantly more difficult to debug.

Datastore Requirements and Assumptions

The goal of your system is to accept put()s from clients and retrieve the corresponding data when a get() is issued. To ensure that data is not lost when a process crashes, all data from clients must be replicated, which then raises the dueling issues of how to maintain consistency and achieve high-availability. To meet these goals, your datastore will implement the Raft consensus protocol. Ultimately, your datastore should achieve the following two goals:
  1. Consistency - clients should always receive correct answers to get() requests.
  2. Availability - clients should be able to execute put() and get() requests at any time with low latency (i.e. your system should execute requests quickly).
Raft is a complicated protocol, and real-world datastores are extremely complicated artifacts. To simplify this project, there are several things you do not need to implement:

Implementing Raft

The Raft paper is specifically designed to be easy to read. To implement the protocol you should definitely start by reading the paper. Additional papers and resources are available on the Raft Github. I would suggest the following series of steps to begin working on your datastore implementation:
  1. Add basic support for responding to client get() and put() requests. At this point, you can respond to all requests with a "type": "fail" message.
  2. Implement the Raft election protocol (section 5.2 of the Raft paper); add the ability to respond to get() and put() requests with "type": "redirect" messages.
  3. Add a timeout to detect leader failures (i.e. if you don't hear from the leader in X milliseconds...) and make sure that the new election proceeds correctly.
  4. Implement a basic, empty version of the AppendEntries RPC call that doesn't replicate any data, but acts as a keepalive message from the leader to other replicas to prevent unnecessary elections.
  5. Implement the transaction log and the "state machine" (i.e. a dictionary containing the key/value pairs from clients, Section 5.3). Don't bother replicating the transactions, just ensure that the leader is able to correctly answer get() and put() requests.
  6. Improve your AppendEntries RPC call to actually send data to replicas. Ensure that updates are only committing when a quorum is in agreement.
  7. Add support for retrying failed commits and test it by experimenting with lossy network simulations.
  8. If you haven't already, modify the leader election to support the additional restrictions in Section 5.4.1; test your implementation on lossy networks with failed leaders.
  9. Implement the subtle commit restriction given in Section 5.4.2.
  10. Test, test, test, and test some more ;)
Step 6 will probably require the most time in terms of writing code and debugging, since it is the crux of the algorithm. Implementing steps 7-9 are necessary to ensure correctness of the protocol, but shouldn't be too difficult.

Performance Testing

15% of your grade on this project will come from performance. Your project will be graded against the submissions of your peers. To help you know how you're doing, the testing script will report total packets sent and average request latency for several of the harder tests to a central database. Note that, by default, only reasonable scores are sent to the database; if your scores aren't showing up, that means the tests are not passing, or you need to improve your performance.

In order to see how your project ranks, you can run

bash$ /course/cs3700f17/bin/project5/printstats
----- TEST: 20% drops, 2 replica failure, 20% read -----
Least overhead:
1: choffnes                         200 packets
2: foo                              220 packets

Lowest query latency:
1: foo                              1.00000
2: choffnes                         1.15000
which will print out the rank of each group for each performance test, divided into the number of packets sent and the query latency. In this particular example, choffnes's project has lower overhead but answers queries more slowly. Obviously, you would ideally have like to have lower latency and fewer total packets sent.

Submitting Your Project

If you have not done so already, register yourself for our grading system using the following command:
$ /course/cs3700f17/bin/register-student [NUID]
NUID is your Northeastern ID number, including any leading zeroes.

Before turning in your project, you and your partner(s) must register your group. To register yourself in a group, execute the following script:

$ /course/cs3700f17/bin/register project5 [team name]
This will either report back success or will give you an error message. If you have trouble registering, please contact the course staff. You and your partner(s) must all run this script with the same [team name]. This is how we know you are part of the same group.

To turn-in your project, you should submit your (thoroughly documented) code along with two other files:

Your README, Makefile, source code, etc. should all be placed in a directory. You submit your project by running the turn-in script as follows:
$ /course/cs3700f17/bin/turnin project5 [project directory]
[project directory] is the name of the directory with your submission. The script will print out every file that you are submitting, so make sure that it prints out all of the files you wish to submit! The turn-in script will not accept submissions that are missing a README or a Makefile. Only one group member needs to submit your project. Your group may submit as many times as you wish; only the last submission will be graded, and the time of the last submission will determine whether your assignment is late.


This project is worth 14% of your final grade. The final grading in this project will consist of At a minimum, your code must pass the test suite without errors or crashes, and it must obey the requirements specified above. All student code will be scanned by plagarism detection software to ensure that students are not copying code from the Internet or each other.