CS 3700 - Networks and Distributed Systems
Return to course homepage
Project 5: Distributed, Replicated Key-Value Store
This project is due at 11:59pm on December 7, 2017.
Description
In his project, you will build a (relatively) simple, distributed, replicated key-value datastore. A key-value
datastore is a very simple type of database that supports two API calls from clients:
put(key, value)
and
get(key). The former API allows a client application to store a key-value pair in the database,
while the latter API allows a client to retrieve a previously stored value by supplying its key. Real-world examples of distributed key-value datastores include memcached,
Redis, DynamoDB, etc.
Of course, it would be simple to build a key-value store if it was a single process. However, your system must be replicated
and support strong consistency guarantees. Thus, you will be implementing a simplified version of the Raft consensus protocol. Your datastore will be run multiple times, in parallel, and it will use the Raft protocol to maintain
consensus among the replicas.
Your datastore will be tested for both correctness and performance. We will provide a testbed for your datastore that will
simulate clients who execute put() and get() commands, as well as an unreliable network that can drop
packets or make hosts unavailable. Part of your grade will come from the overhead your system has (i.e., fewer packets
will result in a higher score), while another part will depend on the speed at which your datastore answers client queries
(i.e. what is the query latency). Your results will be compared to your classmates' via a leaderboard.
Language and Libraries
You may write your code in whatever language you choose, as long as your code compiles and runs on
unmodified CCIS Linux machines
on the command line. Do not use libraries that are not installed by default on the CCIS Linux
machines. Similarly, your code must compile and run on the command line. You may use IDEs (e.g. Eclipse) during development,
but do not turn in your IDE project without a Makefile. Make sure you code has
no dependencies on your IDE.
You may not use libraries or modules that implement consensus protocols. This includes any library that implements Raft,
Paxos, Replicated View-state, or similar protocols. Obviously, you cannot use any libraries or software packages that
implement a replicated key-value datastore. For example, your program cannot be a thin wrapper around memcached, etc.
You may use libraries or modules that implement local database storage (e.g. SQLite, BerkeleyDB, LevelDB) if you want
to use them for persistent storage within each replica. If you have any questions about whether a particular library
or module is allowed, post on Piazza.
Your Program
For this project, you will submit one program named
3700kvstore that implements the replicated datastore. You
may use any language of your choice, and we will give you basic starter code in python. Keep in mind that you are writing
a program that will be run multiple times, in parallel, to form a distributed system. Thus,
3700kvstore will bear some conceptual similarities to your
3700bridge program from Project 2.
If you use C or any other compiled language, your executable should be named 3700kvstore. If you use an interpreted language,
your script should be called 3700kvstore and be marked as executable. If you use a virtual machine-based language (like
Java or C#), you must write a brief Bash shell script, named 3700kvstore, that conforms to the input syntax given below
and then launches your program using whatever incantations are necessary. For example, if you write your solution in
Java, your Bash script might resemble
#!/usr/bin/perl -w
$args = join(' ', @ARGV);
print 'java -jar 3700kvstore.jar $args';
Or, if you use python, your script might start with
#!/usr/bin/python
import foo from bar
and should be marked executable.
Starter Code
Very basic starter code for the assignment in Python, as well as the testbed scripts, are available in /course/cs3700f17/code/project5.
To get started, you should copy this directory into your own local directory (i.e., cp -r /course/cs3700f17/code/project5
~/), since you will need the
run.py and
test.py scripts, as well as the example configuration
files.
The starter code provides a bare-bones implementation of a datastore that simply connects to the LAN and broadcasts a "no-op"
message once every second. You may use this code as a basis for your project if you wish, but it is strongly recommended
that you do not do so unless you are comfortable with Python.
Testing Your Code
In order to evaluate your replicated datastore, we have provided a simulated test environment. The simulator will create
an emulated network and all necessary sockets, execute several copies of your datastore with the appropriate command
line arguments, route messages between the datastore replicas, and generate requests from clients. This script is included
in the starter code, and you can run it by executing
./run.py <config-file>
where <config-file> is the configuration file that describes the test configuration you would like to use. Note that
you will not need to modify the run script, or parse the config files (the run script parses the config files). You may
create custom configs to test your code under different scenarios if you want to.
Config File Format
The configuration file that you pass to ./run.py contains a number of parameters that control the simulation. The file is
formatted in JSON and has the following elements
- lifetime (Required) The number of seconds the simulation should run for. Must be at least 5.
- replicas (Required) The number of replicas to execute, i.e. copies of your 3700kvstore program. Must be at least
3.
- requests (Required) The number of get() and put() requests to randomly generate from clients.
- mix (Optional) Float between 0 and 1 representing the fraction of client queries that are get()s. Defaults to 0.8.
- wait (Optional) The number of seconds to wait before sending any client requests. Defaults to 2 seconds.
- seed (Optional) The random seed to choose. If not specified, a random value is chosen. Setting this value will allow
for a semi-reproducible set of clients and requests.
- drops (Optional) Float between 0 and 1 representing the fraction of messages between replicas to drop. Defaults to
0.
- events (Optional) A list of events that will occur during the simulation. Each event has a type and a time when it
will trigger
- type (Required) The type of event. Valid types are:
- kill_non_leader: will crash fail a random non-leader replica
- kill_leader: will crash fail the current leader.
- part_easy: partition the network, such that the leader has a quorum
- part_hard: partition the network, such that the leader does not have a quorum
- part_end: remove all network partitions
- time (Required) The timestamp, in seconds, when the event should occur.
For example, a simple configuration with no events and a read-heavy workload might look like the following
{
"lifetime": 30,
"replicas": 5,
"requests": 100,
"mix": 0.9
}
and a more complex configuration with events and a lossy network might be
{
"lifetime": 30,
"replicas": 5,
"requests": 100,
"mix": 0.7,
"drops": 0.1,
"events": [{"type": "kill_non_leader", "time": 10},
{"type": "kill_leader", "time": 20}]
}
./run.py Output
The ./run.py script will output any errors it encounters during the simulation, including malformed messages, messages to
unknown destinations, replicas that unexpectedly quit, etc. Once the simulation completes, ./run.py prints some statistics
about your datastore's performance:
bash$ ./run.py config.json
...
Simulation finished.
Leaders: 0 1 3 4
Replicas that died/were killed: 0/1
Total messages sent: 6730
Total messages dropped: 183
Total client get()/put() requests: 60/40
Total duplicate responses: 11
Total unanswered get()/put() requests: 4/3
Total redirects: 9
Total get()/put() failures: 15/31
Total get() with incorrect response: 60
Average/mean query latency: 1.2sec/0.9sec
Ideally, you would like all get() and put() requests to succeed without failing and for them to have low latency. Obviously,
if your system is returning incorrect values to get() requests then your datastore has consistency issues. Furthermore, you
would like the total number of packets to be as low as possible, i.e. the overhead of your datastore on the network should
be low.
Testing Script
Additionally, we have included a basic test script that runs your code under a variety of different configurations and also
checks your code's compatibility with the grading script. If your code fails in the test script we provide, you can be
assured that it will fare poorly when run under the grading script. To run the test script, simply type
bash$ ./test.py
Basic tests (5 replicas, 30 seconds, 100 requests):
No drops, no failures, 80% read [PASS]
No drops, no failures, 60% read [PASS]
No drops, no failures, 40% read [PASS]
No drops, no failures, 20% read [PASS]
Unreliable network tests (5 replicas, 30 seconds, 150 requests):
10% drops, no failures, 80% read [FAIL]
...
This will run your datastore on a number of configurations, and will output whether your program performs sufficiently.
If you wish to run one of the tests manually, you can do so with
bash$ ./run.py test-whatever.json
Message Format
To simplify this project, instead of using real packet formats, we will be sending our data across the wire in JSON (many
languages have utilities to encode and decode JSON, and you are welcome to use these libraries). All messages
must be encoded as a dictionary, and they
must include the following four keys (at a minimum) :
- src - The ID of the source of the message.
- dst - The ID of the destination of the message.
- leader - The ID of the leader, or "FFFF" if the leader's ID is unknown.
- type - The type of the message.
The simulator uses
src and
dst instead of IP addresses in order to route and deliver messages. Furthermore,
the simulator supports multicast: if
dst is set to "FFFF", the message will be delivered to all replicas (use
multicast sparingly, since it is expensive).
leader is the ID of the replica that the sender of the message believes
is the leader. All messages must include the
leader so that the simulator can learn which replica is the leader
(otherwise, the simulator would have no way of determining this information).
type describes the type of the message. You may define custom types in order to implement your datastore (and
you may add custom keys to the message dictionary in these cases). However, there are several message types that your
replicas must support in order to handle requests from clients.
- get - get() messages are read requests from clients. They have the following format:
{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "get", "MID": "<a unique string>",
"key": "<some key>"}
Your replicas may respond with an OK message which include the corresponding value:
{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "ok", "MID": "<a unique string>",
"value": "<value of the key>"}
Or your replicas may respond with a failure message, in which case the client will retry the get():
{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "fail", "MID": "<a unique string>"}
If the client issues a get() for a key that has does not exist (i.e. it was never put()), your datastore should return an
empty value (i.e. an empty string).
- put - put() messages are write requests from clients. They have the following format:
{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "put", "MID": "<a unique string>",
"key": "<some key>", "value": "<value of the key>"}
Your replicas may respond with an OK message if the write was successful:
{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "ok", "MID": "<a unique string>"}
Or your replicas may respond with a failure message, in which case the client will retry the put():
{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "fail", "MID": "<a unique string>"}
- redirect - If the client sends any message (get() or put()) to a replica that is not the leader, it should
respond with a redirect:
{"src": "<ID>", "dst": "<ID>", "leader": "<ID>", "type": "redirect", "MID": "<a unique string>"}
In this case, the client will retry the request by sending it to the specified leader.
Note that in all of the above cases, the MID in a request must match the MID in the response. For example, the following
would be a legal series of requests and responses, where 001A is a client and 0000 and 0001 are replicas:
Request 1 {"src": "001A", "dst": "0001", "leader": "FFFF",
"type": "get", "MID": "4D61ACF83027", "key": "name"}
Response 1 {"src": "0001", "dst": "001A", "leader": "0000",
"type": "redirect", "MID": "4D61ACF83027"}
Request 2 {"src": "001A", "dst": "0000", "leader": "0000",
"type": "get", "MID": "9AB4CE50023", "key": "name"}
Response 2 {"src": "0000", "dst": "001A", "leader": "0000", "type": "ok",
"MID": "9AB4CE50023", "value": "Christo Wilson"}
Again, you will need to develop additional, custom message types in order to implement the Raft consensus protocol. As long
as your messages include the four minimum required fields (src, dst, leader, type), the simulator will ensure that your messages
are delivered.
Command Line Specification
The command line syntax for your
3700kvstore program is given below. The simulator will pass parameters to each
replica representing (1) the ID of the replica, and (2) the IDs of all other replicas in the system. The syntax for launching
your datastore is therefore:
./3700kvstore <your ID> <ID of second replica> [ID3 [ID4 ...]]
For simplicity, all replica IDs are unique four-digit hexadecimal numbers (e.g., 0AA1 or F29A). You will use these IDs as
the
src and
dst in your messages. Clients will also be assigned unique IDs by the simulator.
Connecting to the LAN
We will be using UNIX domain sockets to emulate a LAN. Each of your replicas will connect to a single domain socket (the
way a server would connect to a single Ethernet cable). A replica will send and receive all messages over this socket
(i.e. messages to/from other replicas, as well as messages to/from clients). Your program should be constantly reading
from the socket make sure it receives all messages (they will be buffered if you don't read immediately). The simulator
will take care of routing all sent messages to the appropriate destinations; thus, it's okay if you're not intimately
familiar with how Domain Sockets work, or with how the simulator works.
Each replica should connect to a Domain Socket named "ID" (no-quotes), where ID is the replica's ID (i.e. the first ID it
receives on the command line). We will be using the SOCK_SEQPACKET socket type, which provides a reliable message-oriented
stream. Note that unlike Project 2, you do not need to pad the name of the domain sockets with \0.
Exactly how to connect to a UNIX domain socket depends on your programming language. For example, if you were using perl
to complete the project, your code for connecting would look like:
use IO::Socket::UNIX;
my $lan = IO::Socket::UNIX->new(
Type => SOCK_SEQPACKET,
Peer => "<lan>"
);
You can then read and write from the $lan variable. In python, your code would look like
from socket import socket, SOCK_SEQPACKET, AF_UNIX
s = socket (AF_UNIX, SOCK_SEQPACKET)
s.connect ('<lan>')
with similar results.
We encourage you to write your code in an event-driven style using select() or poll(). This will keep your code single-threaded
and will make debugging your code significantly easier. Alternatively, you can implement your datastore in a threaded
model, but expect it to be significantly more difficult to debug.
Datastore Requirements and Assumptions
The goal of your system is to accept put()s from clients and retrieve the corresponding data when a get() is issued. To ensure
that data is not lost when a process crashes, all data from clients must be replicated, which then raises the dueling
issues of how to maintain consistency and achieve high-availability. To meet these goals, your datastore will implement
the Raft consensus protocol. Ultimately, your datastore should achieve the following two goals:
- Consistency - clients should always receive correct answers to get() requests.
- Availability - clients should be able to execute put() and get() requests at any time with low latency (i.e. your
system should execute requests quickly).
Raft is a complicated protocol, and real-world datastores are extremely complicated artifacts. To simplify this project,
there are several things you
do not need to implement:
- True persistence - you do not need to write client updates to disk, or worry about committing data to permanent storage.
All of the data from clients and the log of updates can live in memory.
- Garbage collection - Raft maintains a log of all updates. In a real system, this log periodically needs to be garbage
collected, since it cannot grow infinitely long. However, your system will not be running for long periods of
time, and therefor you do not need to worry about garbage collection.
- Restarts - in a real system, replicas might fail for a while then come back online, necessitating snapshots and reconciliation.
However, you may assume that replicas in the simulator will crash fail, i.e. they will die completely and never
return.
Implementing Raft
The
Raft paper is specifically designed to be easy to read. To implement
the protocol you should definitely start by reading the paper. Additional papers and resources are available on the
Raft Github. I would suggest the following series of steps to begin working on your datastore implementation:
- Add basic support for responding to client get() and put() requests. At this point, you can respond to all requests
with a "type": "fail" message.
- Implement the Raft election protocol (section 5.2 of the Raft paper); add the ability to respond to get() and
put() requests with "type": "redirect" messages.
- Add a timeout to detect leader failures (i.e. if you don't hear from the leader in X milliseconds...) and make
sure that the new election proceeds correctly.
- Implement a basic, empty version of the AppendEntries RPC call that doesn't replicate any data, but acts as a
keepalive message from the leader to other replicas to prevent unnecessary elections.
- Implement the transaction log and the "state machine" (i.e. a dictionary containing the key/value pairs from
clients, Section 5.3). Don't bother replicating the transactions, just ensure that the leader is able to
correctly answer get() and put() requests.
- Improve your AppendEntries RPC call to actually send data to replicas. Ensure that updates are only committing
when a quorum is in agreement.
- Add support for retrying failed commits and test it by experimenting with lossy network simulations.
- If you haven't already, modify the leader election to support the additional restrictions in Section 5.4.1; test
your implementation on lossy networks with failed leaders.
- Implement the subtle commit restriction given in Section 5.4.2.
- Test, test, test, and test some more ;)
Step 6 will probably require the most time in terms of writing code and debugging, since it is the crux of the algorithm.
Implementing steps 7-9 are necessary to ensure correctness of the protocol, but shouldn't be too difficult.
Performance Testing
15% of your grade on this project will come from performance. Your project will be graded against the submissions of your
peers. To help you know how you're doing, the testing script will report total packets sent and average request latency
for several of the harder tests to a central database. Note that, by default, only reasonable scores are sent to the
database; if your scores aren't showing up, that means the tests are not passing, or you need to improve your performance.
In order to see how your project ranks, you can run
bash$ /course/cs3700f17/bin/project5/printstats
----- TEST: 20% drops, 2 replica failure, 20% read -----
Least overhead:
1: choffnes 200 packets
2: foo 220 packets
Lowest query latency:
1: foo 1.00000
2: choffnes 1.15000
which will print out the rank of each group for each performance test, divided into the number
of packets sent and the query latency. In this particular example, choffnes's project has lower
overhead but answers queries more slowly. Obviously, you would ideally have like to have lower latency and fewer total packets
sent.
Submitting Your Project
If you have not done so already, register yourself for our grading system using the following command:
$ /course/cs3700f17/bin/register-student [NUID]
NUID is your Northeastern ID number, including any leading zeroes.
Before turning in your project, you and your partner(s) must register your group. To register yourself in a group, execute
the following script:
$ /course/cs3700f17/bin/register project5 [team name]
This will either report back success or will give you an error message. If you have trouble registering, please contact the
course staff.
You and your partner(s) must all run this script with the same
[team name]. This is how we know you are part of the same group.
To turn-in your project, you should submit your (thoroughly documented) code along with two other files:
- A Makefile that compiles your code. Your Makefile may be blank, but it must exist.
- A plain-text (no Word or PDF) README file. In this file, you should briefly describe your high-level approach, any
challenges you faced, and an overview of how you tested your code.
Your README, Makefile, source code, etc. should all be placed in a directory. You submit your project by running the turn-in
script as follows:
$ /course/cs3700f17/bin/turnin project5 [project directory]
[project directory] is the name of the directory with your submission. The script will print out every file that you are
submitting, so make sure that it prints out all of the files you wish to submit! The turn-in script will not accept submissions
that are missing a README or a Makefile.
Only one group member needs to submit your project. Your group may submit as many times as you wish; only the
last submission will be graded, and the time of the last submission will determine whether your assignment is late.
Grading
This project is worth 14% of your final grade. The final grading in this project will consist of
- 85% Program correctness (based on passing test cases)
- 15% Performance
At a minimum, your code must pass the test suite without errors or crashes, and it must obey the requirements specified above.
All student code will be scanned by plagarism detection software to ensure that students are not copying code from the
Internet or each other.