Sleds/gluster_status/Daemon.cpp

422 lines
12 KiB
C++

/*
Copyright (c) 2014 IOnU Security Inc. All rights reserved
Created February 2014 by Kendrick Webster
gluster_status/Daemon.cpp - implementation for Daemon.h
*/
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <set>
#include "main.h"
#include "UdpIpc.h"
#include "Time.h"
#include "Random.h"
#include "FileSystemTest.h"
#include "Daemon.h"
using namespace ionu::network;
using namespace ionu::udp_ipc;
using namespace ionu::random;
using namespace ionu::time;
using namespace ionu::filesystem_test;
#define STATES \
X(ST_IDLE) \
X(ST_CREATING_FILE) \
X(ST_CHECK_FILE) \
X(ST_CHECKING_FILE) \
X(ST_CHECK_FILE_REMOTE) \
X(ST_CHECKING_FILE_REMOTE) \
X(ST_DELETE_FILE) \
X(ST_DELETING_FILE) \
struct CRunParams
{
CRunParams(const std::string& mp, const std::string& fp, const address_list_t& p)
: mount_path(mp), file_prefix(fp), peers(p) {}
const std::string& mount_path;
const std::string& file_prefix;
const address_list_t& peers;
};
namespace
{
constexpr uint32_t SOCKET_TIMEOUT_MILLISECONDS = 100;
constexpr unsigned int STATUS_RECOVERY_PERIOD_SECONDS = 60;
constexpr unsigned int TEST_CYCLE_RESTART_DELAY_SECONDS = 30;
constexpr size_t TEST_FILE_RANDOM_NAME_LENGTH = 16;
constexpr size_t TEST_FILE_SIZE = 64;
constexpr unsigned int ZOMBIE_CLEANUP_DELAY_SECONDS = 3;
bool is_faulty;
bool is_remote_faulty;
std::string error_message;
std::string remote_error_message;
CTimer timer, status_recovery_timer, remote_status_recovery_timer;
enum eState
{
#define X(s) s,
STATES
#undef X
}
state;
unsigned int test_file_serial_number;
std::string test_file_name;
std::string relative_test_file_name;
std::string test_file_hash;
CRunParams* run_params;
std::set<CAddress> queried_addresses;
struct CZombieCleanup
{
CTimer timer;
pid_t pid;
CZombieCleanup(pid_t p) : pid(p) {timer.SetSeconds(ZOMBIE_CLEANUP_DELAY_SECONDS);}
};
std::list<CZombieCleanup> zombie_list;
}
#define log_and_set_error(...) logprintf_save_string(error_message, Log::error, __VA_ARGS__)
//-----------------------------------------------------------------------------------
static const char* stateName(unsigned int state)
{
switch (state)
{
#define X(s) case s: return #s;
STATES
#undef X
}
return "ST_?";
}
static void nextState(eState next_state, unsigned int milliseconds)
{
state = next_state;
timer.SetMilliseconds(milliseconds);
}
static void setFaulty(eState next_state = ST_IDLE, unsigned int seconds = 30)
{
is_faulty = true;
status_recovery_timer.SetSeconds(STATUS_RECOVERY_PERIOD_SECONDS);
logprintf(Log::error, "FILE SYSTEM IS FAULTY");
nextState(next_state, seconds * 1000);
std::string message = "faulty:";
message += error_message;
for (const CAddress& a: run_params->peers)
{
SendTo(a, message);
}
}
static std::string getNextTestFileSerialNumberString(void)
{
char buf[16];
snprintf(buf, sizeof(buf), "_%.8X", ++test_file_serial_number);
return buf;
}
//-----------------------------------------------------------------------------------
// Handlers for IPC messages from FileSystemTest.cpp
//-----------------------------------------------------------------------------------
static void handleCreateFileResult(const std::string& result, const std::string& details)
{
if (ST_CREATING_FILE != state)
{
logprintf(Log::warning, "received create_file_result in state %s, result = %s, details = %s",
stateName(state), result.c_str(), details.c_str());
}
else
{
if (0 != result.compare("PASS"))
{
error_message = details;
setFaulty();
}
else
{
test_file_hash = details;
nextState(ST_CHECK_FILE, 1500);
}
}
}
static void handleCheckFileResult(const std::string& result, const std::string& details, const CAddress& from)
{
if (ST_CHECKING_FILE == state)
{
if (0 != result.compare("PASS"))
{
error_message = details;
setFaulty();
}
else
{
if (run_params->peers.size() > 0)
{
nextState(ST_CHECK_FILE_REMOTE, 1500);
}
else
{
nextState(ST_DELETE_FILE, 500);
}
}
}
else if (ST_CHECKING_FILE_REMOTE == state)
{
if (0 != result.compare("PASS"))
{
error_message = details;
setFaulty();
}
else
{
queried_addresses.erase(from);
if (0 == queried_addresses.size())
{
logprintf(Log::debug3, "all remote file check queries have been answered%s", is_faulty ? "" : ", PASS");
nextState(ST_DELETE_FILE, 500);
}
}
}
else
{
logprintf(Log::warning, "received check_file_result in state %s", stateName(state));
}
}
static void handleDeleteFileResult(const std::string& result, const std::string& details)
{
if (ST_DELETING_FILE != state)
{
logprintf(Log::warning, "received delete_file_result in state %s, result = \"%s\", details = \"%s\"",
stateName(state), result.c_str(), details.c_str());
}
else
{
if (0 != result.compare("PASS"))
{
error_message = details;
setFaulty();
}
else
{
nextState(ST_IDLE, TEST_CYCLE_RESTART_DELAY_SECONDS * 1000);
}
}
}
//-----------------------------------------------------------------------------------
// IPC message handler and dispatcher
//-----------------------------------------------------------------------------------
static void handlePacket(const CAddress& from, const std::string& message)
{
std::string type, body;
SplitString(message, type, body);
logprintf(Log::debug3, "IPC packet type = \"%s\", body = \"%s\", from %s:%d",
type.c_str(), body.c_str(), IP4AddressString(from).c_str(), ntohs(from.port));
if (0 == type.compare("ping"))
{
SendTo(from, message);
}
else if (0 == type.compare("child_exit"))
{
// can't reap now, ... message gets here before child has fully exited
zombie_list.emplace_back(static_cast<pid_t>(std::stoul(body)));
}
else if (0 == type.compare("query"))
{
if (is_faulty || is_remote_faulty)
{
std::string reply = "FAIL:";
reply += is_faulty ? error_message : remote_error_message;
SendTo(from, reply);
}
else
{
SendTo(from, "PASS");
}
}
else if (0 == type.compare("faulty"))
{
is_remote_faulty = true;
remote_error_message = body;
remote_status_recovery_timer.SetSeconds(STATUS_RECOVERY_PERIOD_SECONDS);
logprintf(Log::error, "FILE SYSTEM IS FAULTY (notified by %s:%d): %s",
IP4AddressString(from).c_str(), ntohs(from.port), remote_error_message.c_str());
}
else if (0 == type.compare("check_file"))
{
std::string name, hash;
SplitString(body, name, hash);
std::string path_and_name = run_params->mount_path;
path_and_name += name;
CheckTestFile(path_and_name, hash, from);
}
else // dispatch messages from FileSystemTest.cpp
{
std::string result, details;
SplitString(body, result, details);
if (0 == type.compare("create_file_result")) handleCreateFileResult (result, details);
else if (0 == type.compare("check_file_result")) handleCheckFileResult (result, details, from);
else if (0 == type.compare("delete_file_result")) handleDeleteFileResult (result, details);
else
{
logprintf(Log::error, "unhandled message type: \"%s\", body = \"%s\"", type.c_str(), body.c_str());
}
}
}
//-----------------------------------------------------------------------------------
// Main loop and timers
//-----------------------------------------------------------------------------------
static void onIdle(void)
{
if (status_recovery_timer.IsExpired())
{
logprintf(Log::informational, "local status recovery timer expired, reverting to non-faulty status");
logprintf(Log::informational, "remote status is %s", is_remote_faulty ? "faulty" : "good");
status_recovery_timer.Stop();
is_faulty = false;
}
if (remote_status_recovery_timer.IsExpired())
{
logprintf(Log::informational, "remote status recovery timer expired, reverting to non-faulty status");
logprintf(Log::informational, "local status is %s", is_faulty ? "faulty" : "good");
remote_status_recovery_timer.Stop();
is_remote_faulty = false;
}
while (!zombie_list.empty() && zombie_list.front().timer.IsExpired())
{
pid_t z = zombie_list.front().pid;
logprintf(Log::debug3, "reaping zombie %d", z);
pid_t r = waitpid(z, NULL, WNOHANG);
if (r != z)
{
if (r < 0)
{
logprintf(Log::error, "waitpid: %s", strerror(errno));
}
logprintf(Log::error, "waitpid returned %d for expected zombie %d", r, z);
}
zombie_list.pop_front();
}
}
static void onTimer(void)
{
switch (state)
{
case ST_IDLE:
logprintf(Log::debug3, "creating test file");
relative_test_file_name = run_params->file_prefix;
relative_test_file_name += GetRandomString(TEST_FILE_RANDOM_NAME_LENGTH);
relative_test_file_name += getNextTestFileSerialNumberString();
test_file_name = run_params->mount_path;
test_file_name += relative_test_file_name;
CreateTestFile(test_file_name, TEST_FILE_SIZE);
nextState(ST_CREATING_FILE, 5000);
break;
case ST_CREATING_FILE:
log_and_set_error("timed out creating test file");
setFaulty();
break;
case ST_CHECK_FILE:
logprintf(Log::debug3, "checking test file locally");
CheckTestFile(test_file_name, test_file_hash, CAddress());
nextState(ST_CHECKING_FILE, 5000);
break;
case ST_CHECKING_FILE:
log_and_set_error("timed out checking test file locally");
setFaulty();
break;
case ST_CHECK_FILE_REMOTE:
logprintf(Log::debug3, "checking test file remotely");
for (const CAddress& a: run_params->peers)
{
queried_addresses.insert(a);
std::string s = "check_file:";
s += relative_test_file_name;
s += ':';
s += test_file_hash;
SendTo(a, s);
}
nextState(ST_CHECKING_FILE_REMOTE, 5000);
break;
case ST_CHECKING_FILE_REMOTE:
log_and_set_error("timed out checking test file remotely");
setFaulty();
break;
case ST_DELETE_FILE:
logprintf(Log::debug3, "deleting test file");
DeleteTestFile(test_file_name);
nextState(ST_DELETING_FILE, 5000);
break;
case ST_DELETING_FILE:
log_and_set_error("timed out deleting test file");
setFaulty();
break;
}
}
void ionu::gluster_status::daemon::Run(uint16_t listen_port,
const std::string& mount_path, const std::string& test_file_prefix,
const ionu::network::address_list_t& peers, vv_funcptr_t on_started)
{
if (SUCCESS != InitSocket(listen_port, SOCKET_TIMEOUT_MILLISECONDS))
{
exit(EXIT_FAILURE);
}
if (on_started)
{
on_started();
}
CRunParams rp(mount_path, test_file_prefix, peers);
run_params = &rp;
timer.SetSeconds(1);
CAddress a;
std::string s;
for (;;)
{
if (ReceiveFrom(a, s))
{
handlePacket(a, s);
}
if (timer.IsExpired())
{
timer.Stop();
onTimer();
}
onIdle();
}
}
void ionu::gluster_status::daemon::Stop(void)
{
unlink(test_file_name.c_str());
}