#include "MetaCoreRuntimeData/MetaCoreRuntimeDataSource.h" #include #include #include #include #include #include #include #include #ifndef WIN32_LEAN_AND_MEAN #define WIN32_LEAN_AND_MEAN #endif #ifndef NOMINMAX #define NOMINMAX #endif #include #include namespace MetaCore { namespace { constexpr SOCKET MetaCoreInvalidSocket = INVALID_SOCKET; [[nodiscard]] std::string MetaCoreTrim(std::string value) { const auto isSpace = [](unsigned char ch) { return std::isspace(ch) != 0; }; value.erase(value.begin(), std::find_if(value.begin(), value.end(), [&](unsigned char ch) { return !isSpace(ch); })); value.erase(std::find_if(value.rbegin(), value.rend(), [&](unsigned char ch) { return !isSpace(ch); }).base(), value.end()); return value; } [[nodiscard]] const std::string* MetaCoreFindSetting( const MetaCoreDataSourceDefinition& sourceDefinition, const std::string& key ) { const auto iterator = std::find_if( sourceDefinition.ConnectionSettings.begin(), sourceDefinition.ConnectionSettings.end(), [&](const MetaCoreDataSourceSetting& setting) { return setting.Key == key; } ); if (iterator == sourceDefinition.ConnectionSettings.end()) { return nullptr; } return &iterator->Value; } [[nodiscard]] bool MetaCoreParseValuePayload( std::istringstream& parser, const std::string& valueType, MetaCoreRuntimeDataValue& value, std::string& errorMessage ) { value.Quality = MetaCoreRuntimeDataQuality::Good; if (valueType == "bool") { std::string token; if (!(parser >> token)) { errorMessage = "bool value missing"; return false; } value.Type = MetaCoreRuntimeValueType::Bool; value.BoolValue = (token == "true" || token == "1"); return true; } if (valueType == "int64") { std::int64_t parsedValue = 0; if (!(parser >> parsedValue)) { errorMessage = "int64 value missing"; return false; } value.Type = MetaCoreRuntimeValueType::Int64; value.Int64Value = parsedValue; return true; } if (valueType == "double") { double parsedValue = 0.0; if (!(parser >> parsedValue)) { errorMessage = "double value missing"; return false; } value.Type = MetaCoreRuntimeValueType::Double; value.DoubleValue = parsedValue; return true; } if (valueType == "string") { std::string parsedValue; std::getline(parser >> std::ws, parsedValue); value.Type = MetaCoreRuntimeValueType::String; value.StringValue = MetaCoreTrim(parsedValue); return true; } if (valueType == "vec3") { float x = 0.0F; float y = 0.0F; float z = 0.0F; if (!(parser >> x >> y >> z)) { errorMessage = "vec3 value missing"; return false; } value.Type = MetaCoreRuntimeValueType::Vec3; value.Vec3Value = glm::vec3(x, y, z); return true; } errorMessage = "value type not supported"; return false; } [[nodiscard]] std::string MetaCoreResolveReplayFilePath(const std::string& replayFilePath) { const std::filesystem::path inputPath(replayFilePath); const std::vector candidates{ std::filesystem::current_path() / inputPath, std::filesystem::current_path() / ".." / ".." / ".." / inputPath, std::filesystem::current_path() / ".." / ".." / ".." / "TestProject" / "Runtime" / inputPath.filename() }; for (const auto& candidate : candidates) { std::error_code errorCode; if (std::filesystem::exists(candidate, errorCode)) { return std::filesystem::weakly_canonical(candidate, errorCode).string(); } } return (std::filesystem::current_path() / inputPath).string(); } } // namespace std::string MetaCoreMockRuntimeDataSourceAdapter::GetAdapterType() const { return "mock"; } bool MetaCoreMockRuntimeDataSourceAdapter::Configure(const MetaCoreDataSourceDefinition& sourceDefinition) { SourceDefinition_ = sourceDefinition; Status_.SourceId = sourceDefinition.Id; Status_.State = MetaCoreRuntimeDataSourceState::Disconnected; Status_.LastConnectedAt = 0; Status_.LastUpdateAt = 0; Status_.LastError.clear(); ElapsedSeconds_ = 0.0; Sequence_ = 0; return true; } bool MetaCoreMockRuntimeDataSourceAdapter::Connect() { Status_.State = MetaCoreRuntimeDataSourceState::Connected; Status_.LastConnectedAt = 1; Status_.LastError.clear(); return true; } void MetaCoreMockRuntimeDataSourceAdapter::Disconnect() { Status_.State = MetaCoreRuntimeDataSourceState::Disconnected; Status_.LastError.clear(); } void MetaCoreMockRuntimeDataSourceAdapter::Tick(double deltaSeconds) { if (Status_.State != MetaCoreRuntimeDataSourceState::Connected) { return; } ElapsedSeconds_ += std::max(0.0, deltaSeconds); if (!EmitUpdates_) { Status_.State = MetaCoreRuntimeDataSourceState::Degraded; Status_.LastError = "Mock adapter update emission disabled"; } else { Status_.State = MetaCoreRuntimeDataSourceState::Connected; Status_.LastError.clear(); } } std::vector MetaCoreMockRuntimeDataSourceAdapter::PollUpdates() { if (Status_.State == MetaCoreRuntimeDataSourceState::Disconnected || !EmitUpdates_) { return {}; } const std::uint64_t timestamp = static_cast(ElapsedSeconds_ * 1000.0); Status_.LastUpdateAt = timestamp; std::vector updates; updates.push_back(MetaCoreRuntimeDataUpdate{ "cube.position", MetaCoreRuntimeDataValue{ MetaCoreRuntimeValueType::Vec3, false, 0, 0.0, {}, glm::vec3(static_cast(std::sin(ElapsedSeconds_)) * 2.0F, 0.5F, 0.0F), timestamp, MetaCoreRuntimeDataQuality::Good }, ++Sequence_ }); updates.push_back(MetaCoreRuntimeDataUpdate{ "cube.visible", MetaCoreRuntimeDataValue{ MetaCoreRuntimeValueType::Bool, std::fmod(ElapsedSeconds_, 2.0) < 1.0, 0, 0.0, {}, glm::vec3(0.0F, 0.0F, 0.0F), timestamp, MetaCoreRuntimeDataQuality::Good }, ++Sequence_ }); updates.push_back(MetaCoreRuntimeDataUpdate{ "cube.base_color", MetaCoreRuntimeDataValue{ MetaCoreRuntimeValueType::Vec3, false, 0, 0.0, {}, glm::vec3( 0.5F + static_cast(std::sin(ElapsedSeconds_)) * 0.5F, 0.6F, 0.9F - static_cast(std::sin(ElapsedSeconds_)) * 0.3F ), timestamp, MetaCoreRuntimeDataQuality::Good }, ++Sequence_ }); return updates; } const MetaCoreRuntimeDataSourceStatus& MetaCoreMockRuntimeDataSourceAdapter::GetStatus() const { return Status_; } void MetaCoreMockRuntimeDataSourceAdapter::SetEmitUpdates(bool emitUpdates) { EmitUpdates_ = emitUpdates; } std::string MetaCoreFileReplayRuntimeDataSourceAdapter::GetAdapterType() const { return "file_replay"; } bool MetaCoreFileReplayRuntimeDataSourceAdapter::Configure(const MetaCoreDataSourceDefinition& sourceDefinition) { SourceDefinition_ = sourceDefinition; Status_.SourceId = sourceDefinition.Id; Status_.State = MetaCoreRuntimeDataSourceState::Disconnected; Status_.LastConnectedAt = 0; Status_.LastUpdateAt = 0; Status_.LastError.clear(); Frames_.clear(); ReplayFilePath_.clear(); NextFrameIndex_ = 0; ElapsedSeconds_ = 0.0; Sequence_ = 0; const std::string* replayFilePath = MetaCoreFindSetting(sourceDefinition, "file_path"); if (replayFilePath == nullptr || replayFilePath->empty()) { Status_.State = MetaCoreRuntimeDataSourceState::Faulted; Status_.LastError = "Missing file_path connection setting"; return false; } ReplayFilePath_ = MetaCoreResolveReplayFilePath(*replayFilePath); return true; } bool MetaCoreFileReplayRuntimeDataSourceAdapter::Connect() { Status_.State = MetaCoreRuntimeDataSourceState::Connecting; if (!LoadReplayFile()) { Status_.State = MetaCoreRuntimeDataSourceState::Faulted; if (Status_.LastError.empty()) { Status_.LastError = "Failed to load replay file"; } return false; } Status_.State = MetaCoreRuntimeDataSourceState::Connected; Status_.LastConnectedAt = 1; Status_.LastError.clear(); return true; } void MetaCoreFileReplayRuntimeDataSourceAdapter::Disconnect() { Status_.State = MetaCoreRuntimeDataSourceState::Disconnected; Status_.LastError.clear(); NextFrameIndex_ = 0; ElapsedSeconds_ = 0.0; } void MetaCoreFileReplayRuntimeDataSourceAdapter::Tick(double deltaSeconds) { if (Status_.State != MetaCoreRuntimeDataSourceState::Connected && Status_.State != MetaCoreRuntimeDataSourceState::Degraded) { return; } ElapsedSeconds_ += std::max(0.0, deltaSeconds); if (Frames_.empty()) { Status_.State = MetaCoreRuntimeDataSourceState::Degraded; Status_.LastError = "Replay file has no frames"; return; } if (NextFrameIndex_ >= Frames_.size()) { Status_.State = MetaCoreRuntimeDataSourceState::Degraded; Status_.LastError = "Replay stream exhausted"; } else { Status_.State = MetaCoreRuntimeDataSourceState::Connected; Status_.LastError.clear(); } } std::vector MetaCoreFileReplayRuntimeDataSourceAdapter::PollUpdates() { if (Status_.State == MetaCoreRuntimeDataSourceState::Disconnected || Status_.State == MetaCoreRuntimeDataSourceState::Faulted || Frames_.empty()) { return {}; } std::vector updates; while (NextFrameIndex_ < Frames_.size() && Frames_[NextFrameIndex_].EmitAfterSeconds <= ElapsedSeconds_) { MetaCoreRuntimeDataUpdate update = Frames_[NextFrameIndex_].Update; update.Sequence = ++Sequence_; update.Value.SourceTimestamp = static_cast(Frames_[NextFrameIndex_].EmitAfterSeconds * 1000.0); Status_.LastUpdateAt = update.Value.SourceTimestamp; updates.push_back(update); ++NextFrameIndex_; } if (NextFrameIndex_ >= Frames_.size() && updates.empty()) { Status_.State = MetaCoreRuntimeDataSourceState::Degraded; Status_.LastError = "Replay stream exhausted"; } return updates; } const MetaCoreRuntimeDataSourceStatus& MetaCoreFileReplayRuntimeDataSourceAdapter::GetStatus() const { return Status_; } bool MetaCoreFileReplayRuntimeDataSourceAdapter::LoadReplayFile() { Frames_.clear(); std::ifstream input(ReplayFilePath_); if (!input) { Status_.LastError = "Unable to open replay file: " + ReplayFilePath_; return false; } std::string line; std::size_t lineNumber = 0; while (std::getline(input, line)) { ++lineNumber; line = MetaCoreTrim(line); if (line.empty() || line.front() == '#') { continue; } std::istringstream parser(line); double emitAfterSeconds = 0.0; std::string dataPointId; std::string valueType; if (!(parser >> emitAfterSeconds >> dataPointId >> valueType)) { Status_.LastError = "Replay parse error at line " + std::to_string(lineNumber); return false; } MetaCoreRuntimeDataUpdate update; update.DataPointId = dataPointId; update.Sequence = 0; std::string parseError; if (!MetaCoreParseValuePayload(parser, valueType, update.Value, parseError)) { Status_.LastError = "Replay " + parseError + " at line " + std::to_string(lineNumber); return false; } Frames_.push_back(ReplayFrame{ emitAfterSeconds, update }); } std::sort(Frames_.begin(), Frames_.end(), [](const ReplayFrame& lhs, const ReplayFrame& rhs) { return lhs.EmitAfterSeconds < rhs.EmitAfterSeconds; }); return true; } std::string MetaCoreTcpRuntimeDataSourceAdapter::GetAdapterType() const { return "tcp"; } bool MetaCoreTcpRuntimeDataSourceAdapter::Configure(const MetaCoreDataSourceDefinition& sourceDefinition) { SourceDefinition_ = sourceDefinition; Status_.SourceId = sourceDefinition.Id; Status_.State = MetaCoreRuntimeDataSourceState::Disconnected; Status_.LastConnectedAt = 0; Status_.LastUpdateAt = 0; Status_.LastError.clear(); Host_ = "127.0.0.1"; Port_ = 0; Sequence_ = 0; ElapsedSeconds_ = 0.0; ReceiveBuffer_.clear(); PendingUpdates_.clear(); if (const std::string* host = MetaCoreFindSetting(sourceDefinition, "host"); host != nullptr && !host->empty()) { Host_ = *host; } const std::string* portValue = MetaCoreFindSetting(sourceDefinition, "port"); if (portValue == nullptr || portValue->empty()) { Status_.State = MetaCoreRuntimeDataSourceState::Faulted; Status_.LastError = "Missing port connection setting"; return false; } try { Port_ = static_cast(std::stoul(*portValue)); } catch (...) { Status_.State = MetaCoreRuntimeDataSourceState::Faulted; Status_.LastError = "Invalid port connection setting"; return false; } return true; } bool MetaCoreTcpRuntimeDataSourceAdapter::EnsureWinsockInitialized() { static bool initialized = false; if (initialized) { return true; } WSADATA wsaData{}; if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { Status_.LastError = "WSAStartup failed"; return false; } initialized = true; return true; } bool MetaCoreTcpRuntimeDataSourceAdapter::Connect() { Status_.State = MetaCoreRuntimeDataSourceState::Connecting; if (!EnsureWinsockInitialized()) { Status_.State = MetaCoreRuntimeDataSourceState::Faulted; return false; } addrinfo hints{}; hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; addrinfo* result = nullptr; const std::string portString = std::to_string(Port_); if (getaddrinfo(Host_.c_str(), portString.c_str(), &hints, &result) != 0 || result == nullptr) { Status_.State = MetaCoreRuntimeDataSourceState::Faulted; Status_.LastError = "getaddrinfo failed"; return false; } SOCKET socketHandle = socket(result->ai_family, result->ai_socktype, result->ai_protocol); if (socketHandle == MetaCoreInvalidSocket) { freeaddrinfo(result); Status_.State = MetaCoreRuntimeDataSourceState::Faulted; Status_.LastError = "socket creation failed"; return false; } if (connect(socketHandle, result->ai_addr, static_cast(result->ai_addrlen)) == SOCKET_ERROR) { closesocket(socketHandle); freeaddrinfo(result); Status_.State = MetaCoreRuntimeDataSourceState::Faulted; Status_.LastError = "tcp connect failed"; return false; } freeaddrinfo(result); u_long nonBlocking = 1; ioctlsocket(socketHandle, FIONBIO, &nonBlocking); SocketHandle_ = reinterpret_cast(socketHandle); Status_.State = MetaCoreRuntimeDataSourceState::Connected; Status_.LastConnectedAt = 1; Status_.LastError.clear(); return true; } void MetaCoreTcpRuntimeDataSourceAdapter::Disconnect() { if (SocketHandle_ != nullptr) { closesocket(reinterpret_cast(SocketHandle_)); SocketHandle_ = nullptr; } Status_.State = MetaCoreRuntimeDataSourceState::Disconnected; Status_.LastError.clear(); ReceiveBuffer_.clear(); PendingUpdates_.clear(); } void MetaCoreTcpRuntimeDataSourceAdapter::Tick(double deltaSeconds) { ElapsedSeconds_ += std::max(0.0, deltaSeconds); if (SocketHandle_ == nullptr || Status_.State == MetaCoreRuntimeDataSourceState::Faulted) { return; } SOCKET socketHandle = reinterpret_cast(SocketHandle_); std::array buffer{}; for (;;) { const int received = recv(socketHandle, buffer.data(), static_cast(buffer.size()), 0); if (received > 0) { ReceiveBuffer_.append(buffer.data(), static_cast(received)); Status_.State = MetaCoreRuntimeDataSourceState::Connected; Status_.LastError.clear(); continue; } if (received == 0) { Status_.State = MetaCoreRuntimeDataSourceState::Degraded; Status_.LastError = "tcp stream closed by peer"; break; } const int socketError = WSAGetLastError(); if (socketError == WSAEWOULDBLOCK) { break; } Status_.State = MetaCoreRuntimeDataSourceState::Faulted; Status_.LastError = "tcp recv failed"; break; } for (;;) { const std::size_t newlineIndex = ReceiveBuffer_.find('\n'); if (newlineIndex == std::string::npos) { break; } std::string line = MetaCoreTrim(ReceiveBuffer_.substr(0, newlineIndex)); ReceiveBuffer_.erase(0, newlineIndex + 1); if (line.empty() || line.front() == '#') { continue; } MetaCoreRuntimeDataUpdate update; if (!ParseSocketLine(line, update)) { Status_.State = MetaCoreRuntimeDataSourceState::Degraded; if (Status_.LastError.empty()) { Status_.LastError = "tcp line parse failed"; } continue; } PendingUpdates_.push_back(std::move(update)); } } std::vector MetaCoreTcpRuntimeDataSourceAdapter::PollUpdates() { if (PendingUpdates_.empty()) { return {}; } const std::uint64_t timestamp = static_cast(ElapsedSeconds_ * 1000.0); for (auto& update : PendingUpdates_) { update.Sequence = ++Sequence_; update.Value.SourceTimestamp = timestamp; } Status_.LastUpdateAt = timestamp; std::vector updates = std::move(PendingUpdates_); PendingUpdates_.clear(); return updates; } const MetaCoreRuntimeDataSourceStatus& MetaCoreTcpRuntimeDataSourceAdapter::GetStatus() const { return Status_; } bool MetaCoreTcpRuntimeDataSourceAdapter::ParseSocketLine(const std::string& line, MetaCoreRuntimeDataUpdate& update) { std::istringstream parser(line); std::string dataPointId; std::string valueType; if (!(parser >> dataPointId >> valueType)) { Status_.LastError = "tcp line missing header fields"; return false; } update = MetaCoreRuntimeDataUpdate{}; update.DataPointId = dataPointId; std::string parseError; if (!MetaCoreParseValuePayload(parser, valueType, update.Value, parseError)) { Status_.LastError = "tcp " + parseError; return false; } return true; } std::unique_ptr MetaCoreCreateRuntimeDataSourceAdapter( const std::string& adapterType ) { if (adapterType == "mock") { return std::make_unique(); } if (adapterType == "file_replay") { return std::make_unique(); } if (adapterType == "tcp") { return std::make_unique(); } return nullptr; } } // namespace MetaCore