/* Copyright (c) 2013, 2014 IOnU Security Inc. All rights reserved. Copyright (c) 2015, 2016 Sequence Logic, Inc. Created August 2013 by Kendrick Webster K2Daemon/MongoDeviceStatus.cpp - implementation for MongoDeviceStatus.h */ #include #include #include #include "MongoDeviceStatus.h" namespace { // queue of operations, used for deferring write operations that are attempted during transient connection problems // (i.e. when a replica set primary goes down and MongoDB hasn't yet elected a new primary) typedef enum { OP_CLEAN, OP_UPDATE } devstatus_operation_t; struct CDevstatusOp { devstatus_operation_t op; std::string device; std::string status; std::string ipAddy; CDevstatusOp(devstatus_operation_t o) : op(o) {} CDevstatusOp(const char* d, const char* s, const char *pIpAddy) : op(OP_UPDATE), device(d), status(s), ipAddy(pIpAddy) {} }; static std::list queue; success_t doActualUpdate(mongoc_client_t *pConn, const bson_t *pQuery, const bson_t *pUpdate) { success_t r = SUCCESS; mongoc_collection_t *pCollection = mongoc_client_get_collection(pConn, MongoDB::devstatus::DATABASE_NAME, MongoDB::devstatus::COLLECTION_NAME); if (pCollection == NULL) { r = FAILURE; logprintf(Log::error, "Cannot find collection '%s' in database '%s'", MongoDB::devstatus::COLLECTION_NAME, MongoDB::devstatus::DATABASE_NAME); } else { bson_error_t err; if (!mongoc_collection_update(pCollection, MONGOC_UPDATE_MULTI_UPDATE, pQuery, pUpdate, NULL, &err)) { logprintf(Log::error, "mongoc_collection_update: \"%s\"", err.message); r = FAILURE; } mongoc_collection_destroy(pCollection); } return r; } success_t doActualBulkUpdate(mongoc_client_t *pConn, const std::vector > &updates) { success_t r = SUCCESS; mongoc_collection_t *pCollection = mongoc_client_get_collection(pConn, MongoDB::devstatus::DATABASE_NAME, MongoDB::devstatus::COLLECTION_NAME); if (pCollection == NULL) { r = FAILURE; logprintf(Log::error, "Cannot find collection '%s' in database '%s'", MongoDB::devstatus::COLLECTION_NAME, MongoDB::devstatus::DATABASE_NAME); } else { bson_error_t err; mongoc_bulk_operation_t *pBulk = mongoc_collection_create_bulk_operation(pCollection, false, NULL); if (pBulk == NULL) { r = FAILURE; logprintf(Log::error, "Cannot create bulk operation."); } else { for (size_t i = 0; i < updates.size(); ++i) mongoc_bulk_operation_update(pBulk, updates[i].first, updates[i].second, false); bson_t reply; if (!mongoc_bulk_operation_execute(pBulk, &reply, &err)) { logprintf(Log::error, "mongoc_bulk_operation_execute: \"%s\"", err.message); r = FAILURE; } char *pReplyStr = bson_as_json(&reply, NULL); logprintf(Log::debug, "Bulk update: %s", pReplyStr); bson_free(pReplyStr); bson_destroy(&reply); mongoc_bulk_operation_destroy(pBulk); } mongoc_collection_destroy(pCollection); } return r; } success_t doUpdate(mongoc_client_t* connection, const char* device, const char* status, const char *pIpAddy) { success_t r = SUCCESS; bson_t *pQuery = BCON_NEW("_id", BCON_UTF8(device)); bson_t *pUpdate = BCON_NEW("$set", "{", "status", BCON_UTF8(status), "}"); r = doActualUpdate(connection, pQuery, pUpdate); if ((r == SUCCESS) && (pIpAddy != NULL) && (pIpAddy[0] != '\0')) { bson_t *pUpdateIPAddy = BCON_NEW("$set", "{", "ip", BCON_UTF8(pIpAddy), "}"); r = doActualUpdate(connection, pQuery, pUpdateIPAddy); if (pUpdateIPAddy != NULL) bson_destroy(pUpdateIPAddy); } if (pQuery != NULL) bson_destroy(pQuery); if (pUpdate != NULL) bson_destroy(pUpdate); return r; } success_t doClean(mongoc_client_t* connection) { success_t r = SUCCESS; // Update all online stati to offline. // Use a bulk operation! std::vector > updates; bson_t *pUpdate = BCON_NEW("$set", "{", "status", BCON_UTF8( MongoDB::devstatus::STATUS_OFFLINE), "}"); bson_t *pQueryOnline = BCON_NEW("status", BCON_UTF8(MongoDB::devstatus::STATUS_ONLINE)); updates.push_back(std::make_pair(pQueryOnline, pUpdate)); pQueryOnline = BCON_NEW("status", BCON_UTF8(MongoDB::devstatus::STATUS_ONLINE_AUTH)); updates.push_back(std::make_pair(pQueryOnline, pUpdate)); pQueryOnline = BCON_NEW("status", BCON_UTF8(MongoDB::devstatus::STATUS_NOTIFIED)); updates.push_back(std::make_pair(pQueryOnline, pUpdate)); r = doActualBulkUpdate(connection, updates); if (r != SUCCESS) logprintf(Log::warning, "Cannot clean devices, error code: %d", r); for (size_t i = 0; i < updates.size(); ++i) if (updates[i].first != NULL) bson_destroy(updates[i].first); if (pUpdate != NULL) bson_destroy(pUpdate); return r; } void doFlushQueue(mongoc_client_t* connection) { unsigned int n = 0; CStopWatch stopwatch; while (!queue.empty()) { if (MONGODB_FLUSH_TIME_LIMIT_MILLISECONDS <= stopwatch.Milliseconds()) { logprintf(Log::debug, "stopping flush due to time limit (%d ms), queue size = %d, nodes processed = %d, elapsed milliseconds = %0.3lf", MONGODB_FLUSH_TIME_LIMIT_MILLISECONDS, queue.size(), n, stopwatch.Microseconds() / 1000.0); break; } else { CDevstatusOp& op = queue.front(); if (OP_CLEAN == op.op) { if (FAILURE == doClean(connection)) { break; } } else if (OP_UPDATE == op.op) { if (FAILURE == doUpdate(connection, op.device.c_str(), op.status.c_str(), op.ipAddy.c_str())) { break; } } else { logprintf(Log::critical, "encountered unknown opcode(%d) while flushing deferral queue for MongoDB device status", op.op); } } ++n; queue.pop_front(); } } void flushQueue(mongoc_client_t* connection) { if (!queue.empty()) { unsigned int n = queue.size(); logprintf(Log::debug, "flushing deferral queue, size = %d", n); doFlushQueue(connection); unsigned int n2 = queue.size(); logprintf(Log::debug, "after flush, defer queue size = %d, nodes processed = %d", n2, n - n2); } } } void MongoDB::devstatus::Update(const char* device, const char* status, const char *pIpAddy) { mongoc_client_t* connection = MongoDB::GetConnection(); if (NULL == connection) { if (MongoDB::IsTemporaryOutage()) { logprintf(Log::debug, "MongoDB device status Update(): deferring urn(%s) status(%s) due to transient connection problem", device, status); queue.emplace_back(device, status, pIpAddy); } else { logprintf(Log::critical, "MongoDB device status Update(): not connected, discarding update for urn(%s) status(%s)", device, status); } } else if (!queue.empty()) { queue.emplace_back(device, status, pIpAddy); flushQueue(connection); } else { if (FAILURE == doUpdate(connection, device, status, pIpAddy)) { logprintf(Log::warning, "MongoDB device status Update(): deferring urn(%s) status(%s) due to error", device, status); queue.emplace_back(device, status, pIpAddy); } } } // to do: when multi-deamon support is added, this needs to be revised void MongoDB::devstatus::Clean(void) { mongoc_client_t* connection = MongoDB::GetConnection(); if (NULL == connection) { if (MongoDB::IsTemporaryOutage()) { logprintf(Log::debug, "MongoDB device status Clean(): deferring call due to transient connection problem"); queue.emplace_back(OP_CLEAN); } else { logprintf(Log::critical, "MongoDB device status Clean(): not connected, ignoring call"); } } else if (!queue.empty()) { queue.emplace_back(OP_CLEAN); flushQueue(connection); } else { if (FAILURE == doClean(connection)) { logprintf(Log::warning, "MongoDB device status Clean(): deferring operation due to an error"); queue.emplace_back(OP_CLEAN); } } } void MongoDB::devstatus::Timer(void) { if (!queue.empty()) { mongoc_client_t* connection = MongoDB::GetConnection(); if (NULL != connection) { logprintf(Log::debug, "flushing deferral queue via timer"); flushQueue(connection); } } }