MetaCore/Source/MetaCoreRuntimeData/Private/MetaCoreRuntimeDataSource.cpp

619 lines
20 KiB
C++

#include "MetaCoreRuntimeData/MetaCoreRuntimeDataSource.h"
#include <algorithm>
#include <array>
#include <cctype>
#include <cmath>
#include <filesystem>
#include <fstream>
#include <memory>
#include <sstream>
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <winsock2.h>
#include <ws2tcpip.h>
namespace MetaCore {
namespace {
constexpr SOCKET MetaCoreInvalidSocket = INVALID_SOCKET;
[[nodiscard]] std::string MetaCoreTrim(std::string value) {
const auto isSpace = [](unsigned char ch) {
return std::isspace(ch) != 0;
};
value.erase(value.begin(), std::find_if(value.begin(), value.end(), [&](unsigned char ch) {
return !isSpace(ch);
}));
value.erase(std::find_if(value.rbegin(), value.rend(), [&](unsigned char ch) {
return !isSpace(ch);
}).base(), value.end());
return value;
}
[[nodiscard]] const std::string* MetaCoreFindSetting(
const MetaCoreDataSourceDefinition& sourceDefinition,
const std::string& key
) {
const auto iterator = std::find_if(
sourceDefinition.ConnectionSettings.begin(),
sourceDefinition.ConnectionSettings.end(),
[&](const MetaCoreDataSourceSetting& setting) {
return setting.Key == key;
}
);
if (iterator == sourceDefinition.ConnectionSettings.end()) {
return nullptr;
}
return &iterator->Value;
}
[[nodiscard]] bool MetaCoreParseValuePayload(
std::istringstream& parser,
const std::string& valueType,
MetaCoreRuntimeDataValue& value,
std::string& errorMessage
) {
value.Quality = MetaCoreRuntimeDataQuality::Good;
if (valueType == "bool") {
std::string token;
if (!(parser >> token)) {
errorMessage = "bool value missing";
return false;
}
value.Type = MetaCoreRuntimeValueType::Bool;
value.BoolValue = (token == "true" || token == "1");
return true;
}
if (valueType == "int64") {
std::int64_t parsedValue = 0;
if (!(parser >> parsedValue)) {
errorMessage = "int64 value missing";
return false;
}
value.Type = MetaCoreRuntimeValueType::Int64;
value.Int64Value = parsedValue;
return true;
}
if (valueType == "double") {
double parsedValue = 0.0;
if (!(parser >> parsedValue)) {
errorMessage = "double value missing";
return false;
}
value.Type = MetaCoreRuntimeValueType::Double;
value.DoubleValue = parsedValue;
return true;
}
if (valueType == "string") {
std::string parsedValue;
std::getline(parser >> std::ws, parsedValue);
value.Type = MetaCoreRuntimeValueType::String;
value.StringValue = MetaCoreTrim(parsedValue);
return true;
}
if (valueType == "vec3") {
float x = 0.0F;
float y = 0.0F;
float z = 0.0F;
if (!(parser >> x >> y >> z)) {
errorMessage = "vec3 value missing";
return false;
}
value.Type = MetaCoreRuntimeValueType::Vec3;
value.Vec3Value = glm::vec3(x, y, z);
return true;
}
errorMessage = "value type not supported";
return false;
}
[[nodiscard]] std::string MetaCoreResolveReplayFilePath(const std::string& replayFilePath) {
const std::filesystem::path inputPath(replayFilePath);
const std::vector<std::filesystem::path> candidates{
std::filesystem::current_path() / inputPath,
std::filesystem::current_path() / ".." / ".." / ".." / inputPath,
std::filesystem::current_path() / ".." / ".." / ".." / "TestProject" / "Runtime" / inputPath.filename()
};
for (const auto& candidate : candidates) {
std::error_code errorCode;
if (std::filesystem::exists(candidate, errorCode)) {
return std::filesystem::weakly_canonical(candidate, errorCode).string();
}
}
return (std::filesystem::current_path() / inputPath).string();
}
} // namespace
std::string MetaCoreMockRuntimeDataSourceAdapter::GetAdapterType() const {
return "mock";
}
bool MetaCoreMockRuntimeDataSourceAdapter::Configure(const MetaCoreDataSourceDefinition& sourceDefinition) {
SourceDefinition_ = sourceDefinition;
Status_.SourceId = sourceDefinition.Id;
Status_.State = MetaCoreRuntimeDataSourceState::Disconnected;
Status_.LastConnectedAt = 0;
Status_.LastUpdateAt = 0;
Status_.LastError.clear();
ElapsedSeconds_ = 0.0;
Sequence_ = 0;
return true;
}
bool MetaCoreMockRuntimeDataSourceAdapter::Connect() {
Status_.State = MetaCoreRuntimeDataSourceState::Connected;
Status_.LastConnectedAt = 1;
Status_.LastError.clear();
return true;
}
void MetaCoreMockRuntimeDataSourceAdapter::Disconnect() {
Status_.State = MetaCoreRuntimeDataSourceState::Disconnected;
Status_.LastError.clear();
}
void MetaCoreMockRuntimeDataSourceAdapter::Tick(double deltaSeconds) {
if (Status_.State != MetaCoreRuntimeDataSourceState::Connected) {
return;
}
ElapsedSeconds_ += std::max(0.0, deltaSeconds);
if (!EmitUpdates_) {
Status_.State = MetaCoreRuntimeDataSourceState::Degraded;
Status_.LastError = "Mock adapter update emission disabled";
} else {
Status_.State = MetaCoreRuntimeDataSourceState::Connected;
Status_.LastError.clear();
}
}
std::vector<MetaCoreRuntimeDataUpdate> MetaCoreMockRuntimeDataSourceAdapter::PollUpdates() {
if (Status_.State == MetaCoreRuntimeDataSourceState::Disconnected || !EmitUpdates_) {
return {};
}
const std::uint64_t timestamp = static_cast<std::uint64_t>(ElapsedSeconds_ * 1000.0);
Status_.LastUpdateAt = timestamp;
std::vector<MetaCoreRuntimeDataUpdate> updates;
updates.push_back(MetaCoreRuntimeDataUpdate{
"cube.position",
MetaCoreRuntimeDataValue{
MetaCoreRuntimeValueType::Vec3,
false,
0,
0.0,
{},
glm::vec3(static_cast<float>(std::sin(ElapsedSeconds_)) * 2.0F, 0.5F, 0.0F),
timestamp,
MetaCoreRuntimeDataQuality::Good
},
++Sequence_
});
updates.push_back(MetaCoreRuntimeDataUpdate{
"cube.visible",
MetaCoreRuntimeDataValue{
MetaCoreRuntimeValueType::Bool,
std::fmod(ElapsedSeconds_, 2.0) < 1.0,
0,
0.0,
{},
glm::vec3(0.0F, 0.0F, 0.0F),
timestamp,
MetaCoreRuntimeDataQuality::Good
},
++Sequence_
});
updates.push_back(MetaCoreRuntimeDataUpdate{
"cube.base_color",
MetaCoreRuntimeDataValue{
MetaCoreRuntimeValueType::Vec3,
false,
0,
0.0,
{},
glm::vec3(
0.5F + static_cast<float>(std::sin(ElapsedSeconds_)) * 0.5F,
0.6F,
0.9F - static_cast<float>(std::sin(ElapsedSeconds_)) * 0.3F
),
timestamp,
MetaCoreRuntimeDataQuality::Good
},
++Sequence_
});
return updates;
}
const MetaCoreRuntimeDataSourceStatus& MetaCoreMockRuntimeDataSourceAdapter::GetStatus() const {
return Status_;
}
void MetaCoreMockRuntimeDataSourceAdapter::SetEmitUpdates(bool emitUpdates) {
EmitUpdates_ = emitUpdates;
}
std::string MetaCoreFileReplayRuntimeDataSourceAdapter::GetAdapterType() const {
return "file_replay";
}
bool MetaCoreFileReplayRuntimeDataSourceAdapter::Configure(const MetaCoreDataSourceDefinition& sourceDefinition) {
SourceDefinition_ = sourceDefinition;
Status_.SourceId = sourceDefinition.Id;
Status_.State = MetaCoreRuntimeDataSourceState::Disconnected;
Status_.LastConnectedAt = 0;
Status_.LastUpdateAt = 0;
Status_.LastError.clear();
Frames_.clear();
ReplayFilePath_.clear();
NextFrameIndex_ = 0;
ElapsedSeconds_ = 0.0;
Sequence_ = 0;
const std::string* replayFilePath = MetaCoreFindSetting(sourceDefinition, "file_path");
if (replayFilePath == nullptr || replayFilePath->empty()) {
Status_.State = MetaCoreRuntimeDataSourceState::Faulted;
Status_.LastError = "Missing file_path connection setting";
return false;
}
ReplayFilePath_ = MetaCoreResolveReplayFilePath(*replayFilePath);
return true;
}
bool MetaCoreFileReplayRuntimeDataSourceAdapter::Connect() {
Status_.State = MetaCoreRuntimeDataSourceState::Connecting;
if (!LoadReplayFile()) {
Status_.State = MetaCoreRuntimeDataSourceState::Faulted;
if (Status_.LastError.empty()) {
Status_.LastError = "Failed to load replay file";
}
return false;
}
Status_.State = MetaCoreRuntimeDataSourceState::Connected;
Status_.LastConnectedAt = 1;
Status_.LastError.clear();
return true;
}
void MetaCoreFileReplayRuntimeDataSourceAdapter::Disconnect() {
Status_.State = MetaCoreRuntimeDataSourceState::Disconnected;
Status_.LastError.clear();
NextFrameIndex_ = 0;
ElapsedSeconds_ = 0.0;
}
void MetaCoreFileReplayRuntimeDataSourceAdapter::Tick(double deltaSeconds) {
if (Status_.State != MetaCoreRuntimeDataSourceState::Connected &&
Status_.State != MetaCoreRuntimeDataSourceState::Degraded) {
return;
}
ElapsedSeconds_ += std::max(0.0, deltaSeconds);
if (Frames_.empty()) {
Status_.State = MetaCoreRuntimeDataSourceState::Degraded;
Status_.LastError = "Replay file has no frames";
return;
}
if (NextFrameIndex_ >= Frames_.size()) {
Status_.State = MetaCoreRuntimeDataSourceState::Degraded;
Status_.LastError = "Replay stream exhausted";
} else {
Status_.State = MetaCoreRuntimeDataSourceState::Connected;
Status_.LastError.clear();
}
}
std::vector<MetaCoreRuntimeDataUpdate> MetaCoreFileReplayRuntimeDataSourceAdapter::PollUpdates() {
if (Status_.State == MetaCoreRuntimeDataSourceState::Disconnected ||
Status_.State == MetaCoreRuntimeDataSourceState::Faulted ||
Frames_.empty()) {
return {};
}
std::vector<MetaCoreRuntimeDataUpdate> updates;
while (NextFrameIndex_ < Frames_.size() &&
Frames_[NextFrameIndex_].EmitAfterSeconds <= ElapsedSeconds_) {
MetaCoreRuntimeDataUpdate update = Frames_[NextFrameIndex_].Update;
update.Sequence = ++Sequence_;
update.Value.SourceTimestamp = static_cast<std::uint64_t>(Frames_[NextFrameIndex_].EmitAfterSeconds * 1000.0);
Status_.LastUpdateAt = update.Value.SourceTimestamp;
updates.push_back(update);
++NextFrameIndex_;
}
if (NextFrameIndex_ >= Frames_.size() && updates.empty()) {
Status_.State = MetaCoreRuntimeDataSourceState::Degraded;
Status_.LastError = "Replay stream exhausted";
}
return updates;
}
const MetaCoreRuntimeDataSourceStatus& MetaCoreFileReplayRuntimeDataSourceAdapter::GetStatus() const {
return Status_;
}
bool MetaCoreFileReplayRuntimeDataSourceAdapter::LoadReplayFile() {
Frames_.clear();
std::ifstream input(ReplayFilePath_);
if (!input) {
Status_.LastError = "Unable to open replay file: " + ReplayFilePath_;
return false;
}
std::string line;
std::size_t lineNumber = 0;
while (std::getline(input, line)) {
++lineNumber;
line = MetaCoreTrim(line);
if (line.empty() || line.front() == '#') {
continue;
}
std::istringstream parser(line);
double emitAfterSeconds = 0.0;
std::string dataPointId;
std::string valueType;
if (!(parser >> emitAfterSeconds >> dataPointId >> valueType)) {
Status_.LastError = "Replay parse error at line " + std::to_string(lineNumber);
return false;
}
MetaCoreRuntimeDataUpdate update;
update.DataPointId = dataPointId;
update.Sequence = 0;
std::string parseError;
if (!MetaCoreParseValuePayload(parser, valueType, update.Value, parseError)) {
Status_.LastError = "Replay " + parseError + " at line " + std::to_string(lineNumber);
return false;
}
Frames_.push_back(ReplayFrame{
emitAfterSeconds,
update
});
}
std::sort(Frames_.begin(), Frames_.end(), [](const ReplayFrame& lhs, const ReplayFrame& rhs) {
return lhs.EmitAfterSeconds < rhs.EmitAfterSeconds;
});
return true;
}
std::string MetaCoreTcpRuntimeDataSourceAdapter::GetAdapterType() const {
return "tcp";
}
bool MetaCoreTcpRuntimeDataSourceAdapter::Configure(const MetaCoreDataSourceDefinition& sourceDefinition) {
SourceDefinition_ = sourceDefinition;
Status_.SourceId = sourceDefinition.Id;
Status_.State = MetaCoreRuntimeDataSourceState::Disconnected;
Status_.LastConnectedAt = 0;
Status_.LastUpdateAt = 0;
Status_.LastError.clear();
Host_ = "127.0.0.1";
Port_ = 0;
Sequence_ = 0;
ElapsedSeconds_ = 0.0;
ReceiveBuffer_.clear();
PendingUpdates_.clear();
if (const std::string* host = MetaCoreFindSetting(sourceDefinition, "host"); host != nullptr && !host->empty()) {
Host_ = *host;
}
const std::string* portValue = MetaCoreFindSetting(sourceDefinition, "port");
if (portValue == nullptr || portValue->empty()) {
Status_.State = MetaCoreRuntimeDataSourceState::Faulted;
Status_.LastError = "Missing port connection setting";
return false;
}
try {
Port_ = static_cast<std::uint16_t>(std::stoul(*portValue));
} catch (...) {
Status_.State = MetaCoreRuntimeDataSourceState::Faulted;
Status_.LastError = "Invalid port connection setting";
return false;
}
return true;
}
bool MetaCoreTcpRuntimeDataSourceAdapter::EnsureWinsockInitialized() {
static bool initialized = false;
if (initialized) {
return true;
}
WSADATA wsaData{};
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
Status_.LastError = "WSAStartup failed";
return false;
}
initialized = true;
return true;
}
bool MetaCoreTcpRuntimeDataSourceAdapter::Connect() {
Status_.State = MetaCoreRuntimeDataSourceState::Connecting;
if (!EnsureWinsockInitialized()) {
Status_.State = MetaCoreRuntimeDataSourceState::Faulted;
return false;
}
addrinfo hints{};
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
addrinfo* result = nullptr;
const std::string portString = std::to_string(Port_);
if (getaddrinfo(Host_.c_str(), portString.c_str(), &hints, &result) != 0 || result == nullptr) {
Status_.State = MetaCoreRuntimeDataSourceState::Faulted;
Status_.LastError = "getaddrinfo failed";
return false;
}
SOCKET socketHandle = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
if (socketHandle == MetaCoreInvalidSocket) {
freeaddrinfo(result);
Status_.State = MetaCoreRuntimeDataSourceState::Faulted;
Status_.LastError = "socket creation failed";
return false;
}
if (connect(socketHandle, result->ai_addr, static_cast<int>(result->ai_addrlen)) == SOCKET_ERROR) {
closesocket(socketHandle);
freeaddrinfo(result);
Status_.State = MetaCoreRuntimeDataSourceState::Faulted;
Status_.LastError = "tcp connect failed";
return false;
}
freeaddrinfo(result);
u_long nonBlocking = 1;
ioctlsocket(socketHandle, FIONBIO, &nonBlocking);
SocketHandle_ = reinterpret_cast<void*>(socketHandle);
Status_.State = MetaCoreRuntimeDataSourceState::Connected;
Status_.LastConnectedAt = 1;
Status_.LastError.clear();
return true;
}
void MetaCoreTcpRuntimeDataSourceAdapter::Disconnect() {
if (SocketHandle_ != nullptr) {
closesocket(reinterpret_cast<SOCKET>(SocketHandle_));
SocketHandle_ = nullptr;
}
Status_.State = MetaCoreRuntimeDataSourceState::Disconnected;
Status_.LastError.clear();
ReceiveBuffer_.clear();
PendingUpdates_.clear();
}
void MetaCoreTcpRuntimeDataSourceAdapter::Tick(double deltaSeconds) {
ElapsedSeconds_ += std::max(0.0, deltaSeconds);
if (SocketHandle_ == nullptr || Status_.State == MetaCoreRuntimeDataSourceState::Faulted) {
return;
}
SOCKET socketHandle = reinterpret_cast<SOCKET>(SocketHandle_);
std::array<char, 1024> buffer{};
for (;;) {
const int received = recv(socketHandle, buffer.data(), static_cast<int>(buffer.size()), 0);
if (received > 0) {
ReceiveBuffer_.append(buffer.data(), static_cast<std::size_t>(received));
Status_.State = MetaCoreRuntimeDataSourceState::Connected;
Status_.LastError.clear();
continue;
}
if (received == 0) {
Status_.State = MetaCoreRuntimeDataSourceState::Degraded;
Status_.LastError = "tcp stream closed by peer";
break;
}
const int socketError = WSAGetLastError();
if (socketError == WSAEWOULDBLOCK) {
break;
}
Status_.State = MetaCoreRuntimeDataSourceState::Faulted;
Status_.LastError = "tcp recv failed";
break;
}
for (;;) {
const std::size_t newlineIndex = ReceiveBuffer_.find('\n');
if (newlineIndex == std::string::npos) {
break;
}
std::string line = MetaCoreTrim(ReceiveBuffer_.substr(0, newlineIndex));
ReceiveBuffer_.erase(0, newlineIndex + 1);
if (line.empty() || line.front() == '#') {
continue;
}
MetaCoreRuntimeDataUpdate update;
if (!ParseSocketLine(line, update)) {
Status_.State = MetaCoreRuntimeDataSourceState::Degraded;
if (Status_.LastError.empty()) {
Status_.LastError = "tcp line parse failed";
}
continue;
}
PendingUpdates_.push_back(std::move(update));
}
}
std::vector<MetaCoreRuntimeDataUpdate> MetaCoreTcpRuntimeDataSourceAdapter::PollUpdates() {
if (PendingUpdates_.empty()) {
return {};
}
const std::uint64_t timestamp = static_cast<std::uint64_t>(ElapsedSeconds_ * 1000.0);
for (auto& update : PendingUpdates_) {
update.Sequence = ++Sequence_;
update.Value.SourceTimestamp = timestamp;
}
Status_.LastUpdateAt = timestamp;
std::vector<MetaCoreRuntimeDataUpdate> updates = std::move(PendingUpdates_);
PendingUpdates_.clear();
return updates;
}
const MetaCoreRuntimeDataSourceStatus& MetaCoreTcpRuntimeDataSourceAdapter::GetStatus() const {
return Status_;
}
bool MetaCoreTcpRuntimeDataSourceAdapter::ParseSocketLine(const std::string& line, MetaCoreRuntimeDataUpdate& update) {
std::istringstream parser(line);
std::string dataPointId;
std::string valueType;
if (!(parser >> dataPointId >> valueType)) {
Status_.LastError = "tcp line missing header fields";
return false;
}
update = MetaCoreRuntimeDataUpdate{};
update.DataPointId = dataPointId;
std::string parseError;
if (!MetaCoreParseValuePayload(parser, valueType, update.Value, parseError)) {
Status_.LastError = "tcp " + parseError;
return false;
}
return true;
}
std::unique_ptr<MetaCoreIRuntimeDataSourceAdapter> MetaCoreCreateRuntimeDataSourceAdapter(
const std::string& adapterType
) {
if (adapterType == "mock") {
return std::make_unique<MetaCoreMockRuntimeDataSourceAdapter>();
}
if (adapterType == "file_replay") {
return std::make_unique<MetaCoreFileReplayRuntimeDataSourceAdapter>();
}
if (adapterType == "tcp") {
return std::make_unique<MetaCoreTcpRuntimeDataSourceAdapter>();
}
return nullptr;
}
} // namespace MetaCore