Sleds/K2Daemon/MongoNotifyQueue.cpp

503 lines
14 KiB
C++
Raw Normal View History

2025-03-13 21:28:38 +00:00
/*
Copyright (c) 2013, 2014 IOnU Security Inc. All rights reserved. Copyright (c) 2015, 2016 Sequence Logic, Inc.
Created August 2013 by Kendrick Webster
K2Daemon/MongoNotifyQueue.cpp - implementation for MongoNotifyQueue.h
*/
#include <string>
#include <list>
#include "MongoNotifyQueue.h"
namespace
{
// queue of operations, used for deferring write operations that are attempted during transient connection problems
// (i.e. when a replica set primary goes down and MongoDB hasn't yet elected a new primary)
typedef enum
{
OP_UPDATE,
OP_UPDATE_ALL,
OP_DELETE
}
nqueue_operation_t;
struct CNqueueOp
{
nqueue_operation_t op;
std::string s1;
std::string s2;
CNqueueOp(nqueue_operation_t o, const char* s, const char* t = "") : op(o), s1(s), s2(t) {}
};
std::list<CNqueueOp> queue;
success_t doActualUpdate(mongoc_client_t *pConn, const bson_t *pQuery, const bson_t *pUpdate)
{
success_t r = SUCCESS;
mongoc_collection_t *pCollection = mongoc_client_get_collection(pConn, MongoDB::nqueue::DATABASE_NAME, MongoDB::nqueue::COLLECTION_NAME);
if (pCollection == NULL)
{
r = FAILURE;
logprintf(Log::error, "Cannot find collection '%s' in database '%s'", MongoDB::nqueue::COLLECTION_NAME, MongoDB::nqueue::DATABASE_NAME);
}
else
{
bson_error_t err;
if (!mongoc_collection_update(pCollection, MONGOC_UPDATE_NONE, pQuery, pUpdate, NULL, &err))
{
logprintf(Log::error, "mongoc_collection_update: \"%s\"", err.message);
r = FAILURE;
}
mongoc_collection_destroy(pCollection);
}
return r;
}
success_t doUpdate(mongoc_client_t* connection, const char* id, const char* type)
{
success_t r = SUCCESS;
bson_t *pQuery = BCON_NEW("_id", BCON_UTF8(id));
bson_t *pUpdate = BCON_NEW("$set", "{", "type", BCON_UTF8(type), "}");
r = doActualUpdate(connection, pQuery, pUpdate);
if (pQuery != NULL)
bson_destroy(pQuery);
if (pUpdate != NULL)
bson_destroy(pUpdate);
return r;
}
static success_t doUpdateAll(mongoc_client_t* connection, const char* device, const char* type)
{
success_t r = SUCCESS;
bson_t *pQuery = BCON_NEW("device", BCON_UTF8(device));
bson_t *pUpdate = BCON_NEW("$set", "{", "type", BCON_UTF8(type), "}");
r = doActualUpdate(connection, pQuery, pUpdate);
if (pQuery != NULL)
bson_destroy(pQuery);
if (pUpdate != NULL)
bson_destroy(pUpdate);
return r;
}
success_t doDelete(mongoc_client_t* connection, const char* id)
{
success_t r = SUCCESS;
bson_t *pDoc = BCON_NEW("_id", BCON_UTF8(id));
mongoc_collection_t *pCollection = mongoc_client_get_collection(connection, MongoDB::nqueue::DATABASE_NAME, MongoDB::nqueue::COLLECTION_NAME);
if (pCollection == NULL)
{
r = FAILURE;
logprintf(Log::error, "Cannot find collection '%s' in database '%s'", MongoDB::nqueue::COLLECTION_NAME, MongoDB::nqueue::DATABASE_NAME);
}
else
{
bson_error_t err;
//logprintf(Log::informational, "Remove single doc");
if (!mongoc_collection_remove(pCollection, MONGOC_REMOVE_SINGLE_REMOVE, pDoc, NULL, &err))
{
logprintf(Log::error, "mongoc_collection_remove: \"%s\"", err.message);
r = FAILURE;
}
mongoc_collection_destroy(pCollection);
}
//logprintf(Log::informational, "Return from delete: %d", (int)r);
return r;
}
void doFlushQueue(mongoc_client_t* connection)
{
unsigned int n = 0;
CStopWatch stopwatch;
while (!queue.empty())
{
if (MONGODB_FLUSH_TIME_LIMIT_MILLISECONDS <= stopwatch.Milliseconds())
{
logprintf(Log::debug, "stopping flush due to time limit (%d ms), queue size = %d, nodes processed = %d, elapsed milliseconds = %0.3lf",
MONGODB_FLUSH_TIME_LIMIT_MILLISECONDS, queue.size(), n, stopwatch.Microseconds() / 1000.0);
break;
}
else
{
CNqueueOp& op = queue.front();
if (OP_UPDATE == op.op)
{
if (FAILURE == doUpdate(connection, op.s1.c_str(), op.s2.c_str()))
{
break;
}
}
else if (OP_UPDATE_ALL == op.op)
{
if (FAILURE == doUpdateAll(connection, op.s1.c_str(), op.s2.c_str()))
{
break;
}
}
else if (OP_DELETE == op.op)
{
if (FAILURE == doDelete(connection, op.s1.c_str()))
{
break;
}
}
else
{
logprintf(Log::critical, "encountered unknown opcode(%d) while flushing deferral queue for MongoDB notification queue", op.op);
}
}
++n;
queue.pop_front();
}
}
void flushQueue(mongoc_client_t* connection)
{
if (!queue.empty())
{
unsigned int n = queue.size();
logprintf(Log::debug, "flushing deferral queue, size = %d", n);
doFlushQueue(connection);
unsigned int n2 = queue.size();
logprintf(Log::debug, "after flush, defer queue size = %d, nodes processed = %d", n2, n - n2);
}
}
void parseNqueueDoc(MongoDB::nqueue::doc_t* p, mongoc_cursor_t* cursor)
{
uint32_t nLen = 0;
bson_iter_t iter;
const bson_t *pCursorBson = mongoc_cursor_current(cursor);
if (bson_iter_init_find(&iter, pCursorBson, "device") &&
(bson_iter_utf8(&iter, &nLen) != NULL))
p->device = bson_iter_utf8(&iter, &nLen);
if (bson_iter_init_find(&iter, pCursorBson, "type") &&
(bson_iter_utf8(&iter, &nLen) != NULL))
p->type = bson_iter_utf8(&iter, &nLen);
if (bson_iter_init_find(&iter, pCursorBson, "info") &&
(bson_iter_utf8(&iter, &nLen) != NULL))
p->info = bson_iter_utf8(&iter, &nLen);
if (bson_iter_init_find(&iter, pCursorBson, "expires") &&
(bson_iter_utf8(&iter, &nLen) != NULL))
p->expires = bson_iter_utf8(&iter, &nLen);
if (bson_iter_init_find(&iter, pCursorBson, "time") &&
(bson_iter_utf8(&iter, &nLen) != NULL))
p->time = bson_iter_utf8(&iter, &nLen);
}
}
MongoDB::nqueue::doc_t * MongoDB::nqueue::GetByType(const char* type, bool& try_again)
{
if (!queue.empty())
{
try_again = true;
logprintf(Log::informational, "MongoDB notification queue GetByType(%s): deferred operation queue is not empty", type);
return NULL;
}
mongoc_client_t* connection = MongoDB::GetConnection();
if (NULL == connection)
{
if (MongoDB::IsTemporaryOutage())
{
try_again = true;
logprintf(Log::informational, "MongoDB notification queue GetByType(%s): transient connnection failure", type);
}
else
{
logprintf(Log::critical, "MongoDB notification queue GetByType(%s): not connected", type);
}
return NULL;
}
bson_t *pQuery = BCON_NEW("type", BCON_UTF8(type));
mongoc_collection_t *pCollection = mongoc_client_get_collection(connection, MongoDB::nqueue::DATABASE_NAME, MongoDB::nqueue::COLLECTION_NAME);
mongoc_cursor_t *pCursor = mongoc_collection_find(pCollection, MONGOC_QUERY_NONE, 0, 100, 0, pQuery, NULL, NULL);
MongoDB::nqueue::doc_t *p = NULL, *q = NULL, *r = NULL;
bool first_try = true;
bool any_success = false;
const bson_t *pDoc;
retry_query:
while (mongoc_cursor_next(pCursor, &pDoc))
{
any_success = true;
bson_iter_t iter;
if (bson_iter_init_find(&iter, mongoc_cursor_current(pCursor), "_id"))
{
p = new MongoDB::nqueue::doc_t;
if (NULL == q)
{
r = p;
}
else
{
q->next = p;
}
q = p;
p->next = NULL;
uint32_t nLen = 0;
p->id = bson_iter_utf8(&iter, &nLen);
parseNqueueDoc(p, pCursor);
}
}
bson_error_t cursorErr;
bool bCursorErr = mongoc_cursor_error(pCursor, &cursorErr);
if (!any_success && bCursorErr)
{
if (first_try)
{
first_try = false;
logprintf(Log::error, "mongoc_cursor_next: \"%s\"", cursorErr.message);
if (SUCCESS == MongoDB::Reconnect())
{
goto retry_query;
}
}
else
{
try_again = true;
logprintf(Log::error, "mongo_cursor_next (after re-connect): \"%s\"", cursorErr.message);
}
}
if (bCursorErr && !try_again)
{
try_again = true;
logprintf(Log::informational, "MongoDB notification queue GetByType(%s): operation not finished: \"%s\"",
type, cursorErr.message);
}
mongoc_cursor_destroy(pCursor);
mongoc_collection_destroy(pCollection);
bson_destroy(pQuery);
return r;
}
void MongoDB::nqueue::Update(MongoDB::nqueue::doc_t& doc)
{
const char* id = doc.id.c_str();
const char* type = doc.type.c_str();
mongoc_client_t* connection = MongoDB::GetConnection();
if (NULL == connection)
{
if (MongoDB::IsTemporaryOutage())
{
logprintf(Log::debug, "MongoDB notification queue Update(): deferring id(%s) --> type(%s) due to transient connection problem", id, type);
queue.emplace_back(OP_UPDATE, id, type);
}
else
{
logprintf(Log::critical, "MongoDB notification queue Update(): not connected");
}
}
else if (!queue.empty())
{
queue.emplace_back(OP_UPDATE, id, type);
flushQueue(connection);
}
else
{
if (FAILURE == doUpdate(connection, id, type))
{
logprintf(Log::warning, "MongoDB notification queue Update(): deferring id(%s) --> type(%s) due to an error", id, type);
queue.emplace_back(OP_UPDATE, id, type);
}
}
}
void MongoDB::nqueue::UpdateAllDeviceRecords(const char* device, const char* type)
{
mongoc_client_t* connection = MongoDB::GetConnection();
if (NULL == connection)
{
if (MongoDB::IsTemporaryOutage())
{
logprintf(Log::debug, "MongoDB notification queue UpdateAllDeviceRecords(): deferring device(%s) --> type(%s) due to transient connection problem", device, type);
queue.emplace_back(OP_UPDATE_ALL, device, type);
}
else
{
logprintf(Log::critical, "MongoDB notification queue UpdateAllDeviceRecords(): not connected");
}
}
else if (!queue.empty())
{
queue.emplace_back(OP_UPDATE_ALL, device, type);
flushQueue(connection);
}
else
{
if (FAILURE == doUpdateAll(connection, device, type))
{
logprintf(Log::warning, "MongoDB notification queue UpdateAllDeviceRecords(): deferring device(%s) --> type(%s) due to an error", device, type);
queue.emplace_back(OP_UPDATE_ALL, device, type);
}
}
}
void MongoDB::nqueue::Delete(const char* id)
{
mongoc_client_t* connection = MongoDB::GetConnection();
if (NULL == connection)
{
if (MongoDB::IsTemporaryOutage())
{
logprintf(Log::debug, "MongoDB notification queue Delete(): deferring id(%s) due to transient connection problem", id);
queue.emplace_back(OP_DELETE, id);
}
else
{
logprintf(Log::critical, "MongoDB notification queue Delete(): not connected");
}
}
else if (!queue.empty())
{
queue.emplace_back(OP_DELETE, id);
flushQueue(connection);
}
else
{
if (FAILURE == doDelete(connection, id))
{
logprintf(Log::warning, "MongoDB notification queue Delete(): deferring id(%s) due to an error", id);
queue.emplace_back(OP_DELETE, id);
}
}
}
void MongoDB::nqueue::Timer(void)
{
if (!queue.empty())
{
mongoc_client_t* connection = MongoDB::GetConnection();
if (NULL != connection)
{
logprintf(Log::debug, "flushing deferral queue via timer");
flushQueue(connection);
}
}
}
#ifdef TEST_MONGODB // ------------------------------------------------------
#define TEST_DATA \
Y( \
X("_id", "0123-4567-89AB-CDEF-00001") \
X("device", "TEST.path.to.device1") \
X("type", MongoDB::nqueue::TYPE_K2) \
X("info", "wakeup") \
X("expires", getTestDeviceExpiry()) \
X("time", getTestDeviceTime()) \
) \
Y( \
X("_id", "0123-4567-89AB-CDEF-00002") \
X("device", "TEST.path.to.device2") \
X("type", MongoDB::nqueue::TYPE_K2) \
X("info", "wakeup") \
X("expires", getTestDeviceExpiry()) \
X("time", getTestDeviceTime()) \
) \
Y( \
X("_id", "0123-4567-89AB-CDEF-00003") \
X("device", "TEST.path.to.device3") \
X("type", MongoDB::nqueue::TYPE_K2) \
X("info", "wakeup") \
X("expires", getTestDeviceExpiry()) \
X("time", getTestDeviceTime()) \
) \
Y( \
X("_id", "0123-4567-89AB-CDEF-00004") \
X("device", "TEST.path.to.device4") \
X("type", MongoDB::nqueue::TYPE_K2) \
X("info", "wakeup") \
X("expires", getTestDeviceExpiry()) \
X("time", getTestDeviceTime()) \
) \
Y( \
X("_id", "0123-4567-89AB-CDEF-00005") \
X("device", "TEST.path.to.device5") \
X("type", MongoDB::nqueue::TYPE_K2) \
X("info", "wakeup") \
X("expires", getTestDeviceExpiry()) \
X("time", getTestDeviceTime()) \
) \
static const char * getTestDeviceExpiry()
{
static char buf[64];
using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::steady_clock;
steady_clock::time_point now = steady_clock::now();
static steady_clock::time_point start = now;
uint64_t elapsed_microseconds = duration_cast<microseconds>(now - start).count();
elapsed_microseconds += (1000000 * K2CLI_HEARTBEAT_TIMEOUT_SECONDS);
snprintf(buf, sizeof(buf), "%lX", elapsed_microseconds);
logprintf(Log::debug, "expr %s", buf);
return buf;
}
static const char * getTestDeviceTime()
{
static char buf[64];
using std::chrono::duration_cast;
using std::chrono::microseconds;
using std::chrono::steady_clock;
steady_clock::time_point now = steady_clock::now();
static steady_clock::time_point start = now;
uint64_t elapsed_microseconds = duration_cast<microseconds>(now - start).count();
snprintf(buf, sizeof(buf), "%lX", elapsed_microseconds);
logprintf(Log::debug, "time %s", buf);
return buf;
}
static void testInsert(mongo* connection, bson* b)
{
if (MONGO_OK == mongo_insert(connection, MongoDB::nqueue::NAMESPACE, b, 0))
{
logprintf(Log::debug, "record inserted");
}
else
{
logprintf(Log::debug, "record not inserted: %s", MongoDB::ErrorString(connection->err));
}
}
void MongoDB::nqueue::AddTestData()
{
logprintf(Log::debug, "<<<<<------ MongoDB notification queue test data insertion -------------------------------");
mongo* connection = MongoDB::GetConnection();
if (NULL == connection)
{
logprintf(Log::error, "MongoDB notification queue AddTestData(): not connected");
return;
}
#define X(key, value) \
bson_append_string(b, key, value);
#define Y(bo) \
{ \
bson b[1]; \
bson_init(b); \
bo \
bson_finish(b); \
testInsert(connection, b); \
}
TEST_DATA
#undef Y
#undef X
logprintf(Log::debug, ">>>>>------ END MongoDB notification queue test data insertion ---------------------------");
}
#endif // TEST_MONGODB ------------------------------------------------------