[Date Prev][Date Next] [Chronological] [Thread] [Top]

(ITS#7894) LMDB: MDB_DUPSORT and mdb_cursor_put(.., MDB_CURRENT) can put DB in illegal state



Full_Name: Max Bolingbroke
Version: LMDB HEAD
OS: OS X
URL: ftp://ftp.openldap.org/incoming/
Submission from: (NULL) (149.254.58.67)


It is possible to insert a (key, value) pair into an MDB_DUPSORTed LMDB database
twice using mdb_cursor_put(..., MDB_CURRENT). If you attempt to insert the same
(key, value) pair twice with mdb_put then the second insert has no effect. This
seems like a bug in mdb_cursor_put.

Furthermore, you can use mdb_cursor_put(..., MDB_CURRENT) to insert duplicates
in the opposite of their correctly sorted order. This is certainly a bug.

The following C program demonstrates both issues:

#define _XOPEN_SOURCE 500
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdarg.h>
#include "lmdb.h"

void panic(const char * restrict format, ...) {
	va_list argp;
	va_start(argp, format);
	vprintf%foformat, argp);
	va_end(argp);

	exit(1);
}

void mdb_check(char *context, int rc) {
	if (rc != 0) {
		panic("%s: %s", context, mdb_strerror(rc));
	}
}

void mdb_check_notfound(char *context, int rc) {
	if (rc != MDB_NOTFOUND) {
		panic("%s: %s", context, mdb_strerror(rc));
	}
}

int BE(int x) {
	return (x & 0x000000FF) << 24 |
		   (x & 0x0000FF00) << 8 |
		   (x & 0x00FF0000) >> 8 |
		   (x & 0xFF000000) >> 24;
}

int main(int argc, char **argv) {
	srand(1337);

	MDB_e * *env;
	mdb_check("mdb_env_create", mdb_env_create(&env));
	mdb_check("mdb_env_set_maxdbs", mdb_env_set_maxdbs(env, 1));
	mdb_check("mdb_env_set_mapsize", mdb_env_set_mapsize(env, 1024 * 1024));
	mdb_check("mdb_env_open", mdb_env_open(env, "testdb", MDB_NOSUBDIR, 0644));

	MDB_txn *txn;
	mdb_check("mdb_txn_begin", mdb_txn_begin(env, NULL, 0, &txn));

	MDB_dbi dbi;
	mdb_check("mdb_dbi_open", mdb_dbi_open(txn, "root", MDB_CREATE | MDB_DUPSORT,
&dbi));

	int key;
	int value;

	MDB_val keyBuffer;
	keyBuffer.mv_size = sizeof(int);
	keyBuffer.mv_data = &key;

	MDB_val valueBuffer;
	valueBuffer.mv_size = sizeof(int);
	valueBuffer.mv_data = &value;

	{
		key = BE(2); value = BE(200); 
		mdb_check("mdb_put", mdb_put(txn, dbi, &keyBuffer, &valueBuffer, 0));

		key = BE(2); value = BE(300);
		mdb_check("mdb_put", mdb_put(txn, dbi, &keyBuffer, &valueBuffer, 0));

		// Inserting the same (key, value) pair twice: second insert is ignored
		key = BE(2); value = BE(300);
		mdb_check("mdb_put", mdb_put(txn, dbi, &keyBuffer, &valueBuffer, 0));
	}

	{
		MDB_cursor *cursor;
		mdb_check("mdb_cursor_open", mdb_cursor_open(txn, dbi, &cursor));

		size_t count;

		mdb_check("mdb_cursor_get", mdb_cursor_get(cursor, &keyBuffer, &valueBuffer,
MDB_FIRST))0D0D
		mdb_check("mdb_cursor_count", mdb_cursor_count(cursor, &count));
		if (count != 2) panic("count != 2");

		if (0) {
			// Inserting the same (key, value) pair twice: second insert is not ignored.
			// While this is different to mdb_put, I can't tell from the docs whether
this is a bug or not.
			keyBuffer.mv_size = sizeof(int); valueBuffer.mv_size = sizeof(int);
			keyBuffer.mv_data = &key; valueBuffer.mv_data = &value;
			key = BE(2); value = BE(300);
			mdb_check("mdb_cursor_put2,2C mdb_cursor_put(cursor, &keyBuffer,
&valueBuffer, MDB_CURRENT));
			mdb_check("mdb_cursor_count", mdb_cursor_count(cursor, &count));
			if (count != 1) panic("count != 1");
		} else {
			// This on the other hand is definitely a bug: after doing this the
duplicates are not sorted.
			keyBuffer.mv_size = sizeof(int); valueBuffer.mv_size = sizeof(int);
			keyBuffer.mv_data = &key; valueBuffer.mv_data = &value;
			key = BE(2); value = BE(400);
			mdb_check("mdb_cursor_put", mdb_cursor_p(c8cursor, &keyBuffer, &valueBuffer,
MDB_CURRENT));
			mdb_check("mdb_cursor_count", mdb_cursor_count(cursor, &count));
			if (count != 2) panic("count != 2");

			mdb_check("mdb_cursor_get", mdb_cursor_get(cursor, &keyBuffer, &valueBuffer,
MDB_FIRST));
			printf("%d\n", BE(*((int*)valueBuffer.mv_data)));

			mdb_check("mdb_cursor_get", mdb_cursor_get(cursor, &keyBuffer, &valueBuffer,
MDB_NEXT));
			printf("%d\n", BE(*((int*)valueBuffer.mv_data)));

			mdb_check_notfound("mdb_cursor_get", mdb_cursor_get(cursor, &keyBuffer,
&valueBuffer, MDB_NEXT));
		}

		mdb_cursor_close(cursor);
	}
	
	mdb_check("mdb_txn_commit", mdb_txn_commit(txn));

	return 0;
}