/* Copyright (c) 2014 IOnU Security Inc. All rights reserved Created February 2014 by Kendrick Webster gluster_status/Daemon.cpp - implementation for Daemon.h */ #include #include #include #include #include "main.h" #include "UdpIpc.h" #include "Time.h" #include "Random.h" #include "FileSystemTest.h" #include "Daemon.h" using namespace ionu::network; using namespace ionu::udp_ipc; using namespace ionu::random; using namespace ionu::time; using namespace ionu::filesystem_test; #define STATES \ X(ST_IDLE) \ X(ST_CREATING_FILE) \ X(ST_CHECK_FILE) \ X(ST_CHECKING_FILE) \ X(ST_CHECK_FILE_REMOTE) \ X(ST_CHECKING_FILE_REMOTE) \ X(ST_DELETE_FILE) \ X(ST_DELETING_FILE) \ struct CRunParams { CRunParams(const std::string& mp, const std::string& fp, const address_list_t& p) : mount_path(mp), file_prefix(fp), peers(p) {} const std::string& mount_path; const std::string& file_prefix; const address_list_t& peers; }; namespace { constexpr uint32_t SOCKET_TIMEOUT_MILLISECONDS = 100; constexpr unsigned int STATUS_RECOVERY_PERIOD_SECONDS = 60; constexpr unsigned int TEST_CYCLE_RESTART_DELAY_SECONDS = 30; constexpr size_t TEST_FILE_RANDOM_NAME_LENGTH = 16; constexpr size_t TEST_FILE_SIZE = 64; constexpr unsigned int ZOMBIE_CLEANUP_DELAY_SECONDS = 3; bool is_faulty; bool is_remote_faulty; std::string error_message; std::string remote_error_message; CTimer timer, status_recovery_timer, remote_status_recovery_timer; enum eState { #define X(s) s, STATES #undef X } state; unsigned int test_file_serial_number; std::string test_file_name; std::string relative_test_file_name; std::string test_file_hash; CRunParams* run_params; std::set queried_addresses; struct CZombieCleanup { CTimer timer; pid_t pid; CZombieCleanup(pid_t p) : pid(p) {timer.SetSeconds(ZOMBIE_CLEANUP_DELAY_SECONDS);} }; std::list zombie_list; } #define log_and_set_error(...) logprintf_save_string(error_message, Log::error, __VA_ARGS__) //----------------------------------------------------------------------------------- static const char* stateName(unsigned int state) { switch (state) { #define X(s) case s: return #s; STATES #undef X } return "ST_?"; } static void nextState(eState next_state, unsigned int milliseconds) { state = next_state; timer.SetMilliseconds(milliseconds); } static void setFaulty(eState next_state = ST_IDLE, unsigned int seconds = 30) { is_faulty = true; status_recovery_timer.SetSeconds(STATUS_RECOVERY_PERIOD_SECONDS); logprintf(Log::error, "FILE SYSTEM IS FAULTY"); nextState(next_state, seconds * 1000); std::string message = "faulty:"; message += error_message; for (const CAddress& a: run_params->peers) { SendTo(a, message); } } static std::string getNextTestFileSerialNumberString(void) { char buf[16]; snprintf(buf, sizeof(buf), "_%.8X", ++test_file_serial_number); return buf; } //----------------------------------------------------------------------------------- // Handlers for IPC messages from FileSystemTest.cpp //----------------------------------------------------------------------------------- static void handleCreateFileResult(const std::string& result, const std::string& details) { if (ST_CREATING_FILE != state) { logprintf(Log::warning, "received create_file_result in state %s, result = %s, details = %s", stateName(state), result.c_str(), details.c_str()); } else { if (0 != result.compare("PASS")) { error_message = details; setFaulty(); } else { test_file_hash = details; nextState(ST_CHECK_FILE, 1500); } } } static void handleCheckFileResult(const std::string& result, const std::string& details, const CAddress& from) { if (ST_CHECKING_FILE == state) { if (0 != result.compare("PASS")) { error_message = details; setFaulty(); } else { if (run_params->peers.size() > 0) { nextState(ST_CHECK_FILE_REMOTE, 1500); } else { nextState(ST_DELETE_FILE, 500); } } } else if (ST_CHECKING_FILE_REMOTE == state) { if (0 != result.compare("PASS")) { error_message = details; setFaulty(); } else { queried_addresses.erase(from); if (0 == queried_addresses.size()) { logprintf(Log::debug3, "all remote file check queries have been answered%s", is_faulty ? "" : ", PASS"); nextState(ST_DELETE_FILE, 500); } } } else { logprintf(Log::warning, "received check_file_result in state %s", stateName(state)); } } static void handleDeleteFileResult(const std::string& result, const std::string& details) { if (ST_DELETING_FILE != state) { logprintf(Log::warning, "received delete_file_result in state %s, result = \"%s\", details = \"%s\"", stateName(state), result.c_str(), details.c_str()); } else { if (0 != result.compare("PASS")) { error_message = details; setFaulty(); } else { nextState(ST_IDLE, TEST_CYCLE_RESTART_DELAY_SECONDS * 1000); } } } //----------------------------------------------------------------------------------- // IPC message handler and dispatcher //----------------------------------------------------------------------------------- static void handlePacket(const CAddress& from, const std::string& message) { std::string type, body; SplitString(message, type, body); logprintf(Log::debug3, "IPC packet type = \"%s\", body = \"%s\", from %s:%d", type.c_str(), body.c_str(), IP4AddressString(from).c_str(), ntohs(from.port)); if (0 == type.compare("ping")) { SendTo(from, message); } else if (0 == type.compare("child_exit")) { // can't reap now, ... message gets here before child has fully exited zombie_list.emplace_back(static_cast(std::stoul(body))); } else if (0 == type.compare("query")) { if (is_faulty || is_remote_faulty) { std::string reply = "FAIL:"; reply += is_faulty ? error_message : remote_error_message; SendTo(from, reply); } else { SendTo(from, "PASS"); } } else if (0 == type.compare("faulty")) { is_remote_faulty = true; remote_error_message = body; remote_status_recovery_timer.SetSeconds(STATUS_RECOVERY_PERIOD_SECONDS); logprintf(Log::error, "FILE SYSTEM IS FAULTY (notified by %s:%d): %s", IP4AddressString(from).c_str(), ntohs(from.port), remote_error_message.c_str()); } else if (0 == type.compare("check_file")) { std::string name, hash; SplitString(body, name, hash); std::string path_and_name = run_params->mount_path; path_and_name += name; CheckTestFile(path_and_name, hash, from); } else // dispatch messages from FileSystemTest.cpp { std::string result, details; SplitString(body, result, details); if (0 == type.compare("create_file_result")) handleCreateFileResult (result, details); else if (0 == type.compare("check_file_result")) handleCheckFileResult (result, details, from); else if (0 == type.compare("delete_file_result")) handleDeleteFileResult (result, details); else { logprintf(Log::error, "unhandled message type: \"%s\", body = \"%s\"", type.c_str(), body.c_str()); } } } //----------------------------------------------------------------------------------- // Main loop and timers //----------------------------------------------------------------------------------- static void onIdle(void) { if (status_recovery_timer.IsExpired()) { logprintf(Log::informational, "local status recovery timer expired, reverting to non-faulty status"); logprintf(Log::informational, "remote status is %s", is_remote_faulty ? "faulty" : "good"); status_recovery_timer.Stop(); is_faulty = false; } if (remote_status_recovery_timer.IsExpired()) { logprintf(Log::informational, "remote status recovery timer expired, reverting to non-faulty status"); logprintf(Log::informational, "local status is %s", is_faulty ? "faulty" : "good"); remote_status_recovery_timer.Stop(); is_remote_faulty = false; } while (!zombie_list.empty() && zombie_list.front().timer.IsExpired()) { pid_t z = zombie_list.front().pid; logprintf(Log::debug3, "reaping zombie %d", z); pid_t r = waitpid(z, NULL, WNOHANG); if (r != z) { if (r < 0) { logprintf(Log::error, "waitpid: %s", strerror(errno)); } logprintf(Log::error, "waitpid returned %d for expected zombie %d", r, z); } zombie_list.pop_front(); } } static void onTimer(void) { switch (state) { case ST_IDLE: logprintf(Log::debug3, "creating test file"); relative_test_file_name = run_params->file_prefix; relative_test_file_name += GetRandomString(TEST_FILE_RANDOM_NAME_LENGTH); relative_test_file_name += getNextTestFileSerialNumberString(); test_file_name = run_params->mount_path; test_file_name += relative_test_file_name; CreateTestFile(test_file_name, TEST_FILE_SIZE); nextState(ST_CREATING_FILE, 5000); break; case ST_CREATING_FILE: log_and_set_error("timed out creating test file"); setFaulty(); break; case ST_CHECK_FILE: logprintf(Log::debug3, "checking test file locally"); CheckTestFile(test_file_name, test_file_hash, CAddress()); nextState(ST_CHECKING_FILE, 5000); break; case ST_CHECKING_FILE: log_and_set_error("timed out checking test file locally"); setFaulty(); break; case ST_CHECK_FILE_REMOTE: logprintf(Log::debug3, "checking test file remotely"); for (const CAddress& a: run_params->peers) { queried_addresses.insert(a); std::string s = "check_file:"; s += relative_test_file_name; s += ':'; s += test_file_hash; SendTo(a, s); } nextState(ST_CHECKING_FILE_REMOTE, 5000); break; case ST_CHECKING_FILE_REMOTE: log_and_set_error("timed out checking test file remotely"); setFaulty(); break; case ST_DELETE_FILE: logprintf(Log::debug3, "deleting test file"); DeleteTestFile(test_file_name); nextState(ST_DELETING_FILE, 5000); break; case ST_DELETING_FILE: log_and_set_error("timed out deleting test file"); setFaulty(); break; } } void ionu::gluster_status::daemon::Run(uint16_t listen_port, const std::string& mount_path, const std::string& test_file_prefix, const ionu::network::address_list_t& peers, vv_funcptr_t on_started) { if (SUCCESS != InitSocket(listen_port, SOCKET_TIMEOUT_MILLISECONDS)) { exit(EXIT_FAILURE); } if (on_started) { on_started(); } CRunParams rp(mount_path, test_file_prefix, peers); run_params = &rp; timer.SetSeconds(1); CAddress a; std::string s; for (;;) { if (ReceiveFrom(a, s)) { handlePacket(a, s); } if (timer.IsExpired()) { timer.Stop(); onTimer(); } onIdle(); } } void ionu::gluster_status::daemon::Stop(void) { unlink(test_file_name.c_str()); }