diff --git a/SilKit/IntegrationTests/ITest_MessageAggregation.cpp b/SilKit/IntegrationTests/ITest_MessageAggregation.cpp index 047383721..1df54f349 100644 --- a/SilKit/IntegrationTests/ITest_MessageAggregation.cpp +++ b/SilKit/IntegrationTests/ITest_MessageAggregation.cpp @@ -83,7 +83,7 @@ TEST_F(ITest_MessageAggregation, timeout_in_case_of_deadlock_when_using_async_si SilKit::Services::PubSub::PubSubSpec dataSpecPing{"ping", {}}; SilKit::Services::PubSub::PubSubSpec dataSpecPong{"pong", {}}; - bool msgReceived{false}; + std::atomic_bool msgReceived{false}; // participant with async simulation step handler & enabled message aggregation { @@ -105,8 +105,8 @@ TEST_F(ITest_MessageAggregation, timeout_in_case_of_deadlock_when_using_async_si }); timeSyncService->SetSimulationStepHandlerAsync( - [dataPublisher, lifecycleService, &msgReceived](std::chrono::nanoseconds /*now*/, - std::chrono::nanoseconds /*duration*/) { + [dataPublisher, lifecycleService, &msgReceived](std::chrono::nanoseconds, + std::chrono::nanoseconds) { // send ping std::vector ping(1, '?'); dataPublisher->Publish(std::move(ping)); @@ -135,7 +135,7 @@ TEST_F(ITest_MessageAggregation, timeout_in_case_of_deadlock_when_using_async_si }); timeSyncService->SetSimulationStepHandlerAsync( - [timeSyncService](std::chrono::nanoseconds /*now*/, std::chrono::nanoseconds /*duration*/) { + [timeSyncService](std::chrono::nanoseconds, std::chrono::nanoseconds) { timeSyncService->CompleteSimulationStep(); }, 1s); } diff --git a/SilKit/IntegrationTests/ITest_NetSimFlexRay.cpp b/SilKit/IntegrationTests/ITest_NetSimFlexRay.cpp index c96f39bf4..5a9d4fde5 100644 --- a/SilKit/IntegrationTests/ITest_NetSimFlexRay.cpp +++ b/SilKit/IntegrationTests/ITest_NetSimFlexRay.cpp @@ -3,6 +3,7 @@ // SPDX-License-Identifier: MIT #include "ITest_NetSim.hpp" +#include "ITestThreadSafeLogger.hpp" #include "silkit/services/flexray/all.hpp" namespace { @@ -23,8 +24,9 @@ struct ITest_NetSimFlexray : ITest_NetSim CallCountsSilKitHandlersFlexray& callCountsSilKitHandlersFlexray) { controller->AddCycleStartHandler( - [&callCountsSilKitHandlersFlexray](IFlexrayController*, const FlexrayCycleStartEvent& /*msg*/) { + [&callCountsSilKitHandlersFlexray](IFlexrayController*, const FlexrayCycleStartEvent& msg) { callCountsSilKitHandlersFlexray.CycleStartHandler++; + Log() << "Cycle Start: " << (int)msg.cycleCounter << " timestamp: " << msg.timestamp; }); controller->AddFrameHandler( [&callCountsSilKitHandlersFlexray](IFlexrayController*, const FlexrayFrameEvent& /*msg*/) { @@ -273,13 +275,20 @@ void MySimulatedFlexrayController::OnTxBufferUpdate( TEST_F(ITest_NetSimFlexray, basic_networksimulation_flexray) { + const auto configSynchronizationPoints = R"( +Logging: + Sinks: + - Type: Stdout + Level: Info +EnableSynchronizationPoints: true +)"; { // ---------------------------- // NetworkSimulator // ---------------------------- //auto configWithLogging = MakeParticipantConfigurationStringWithLogging(SilKit::Services::Logging::Level::Info); - auto&& simParticipant = _simTestHarness->GetParticipant(_participantNameNetSim); + auto&& simParticipant = _simTestHarness->GetParticipant(_participantNameNetSim, configSynchronizationPoints); auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService(); auto&& timeSyncService = simParticipant->GetOrCreateTimeSyncService(); auto&& networkSimulator = simParticipant->GetOrCreateNetworkSimulator(); @@ -307,7 +316,8 @@ TEST_F(ITest_NetSimFlexray, basic_networksimulation_flexray) timeSyncService->SetSimulationStepHandler( [this, simulatedNetworkPtr, lifecycleService, flexrayController]( - auto now, const std::chrono::nanoseconds /*duration*/) { + auto now, const std::chrono::nanoseconds duration) { + (void)duration; if (now == _stopAtMs) { lifecycleService->Stop("stopping the simulation"); @@ -328,7 +338,7 @@ TEST_F(ITest_NetSimFlexray, basic_networksimulation_flexray) for (const auto& participantName : _participantNamesSimulated) { - auto&& simParticipant = _simTestHarness->GetParticipant(participantName); + auto&& simParticipant = _simTestHarness->GetParticipant(participantName, configSynchronizationPoints); auto&& lifecycleService = simParticipant->GetOrCreateLifecycleService(); auto&& timeSyncService = simParticipant->GetOrCreateTimeSyncService(); @@ -337,8 +347,9 @@ TEST_F(ITest_NetSimFlexray, basic_networksimulation_flexray) SetupFlexrayController(lifecycleService, flexrayController, callCounts.silKitHandlersFlexray); timeSyncService->SetSimulationStepHandler( - [this, flexrayController](auto now, const std::chrono::nanoseconds /*duration*/) { + [this, flexrayController](auto now, const std::chrono::nanoseconds duration) { OnetimeActions(now, flexrayController); + Log() << "Simulation step: " << now.count() << " : " << duration.count(); }, _stepSize); } } @@ -358,7 +369,7 @@ TEST_F(ITest_NetSimFlexray, basic_networksimulation_flexray) } } - auto ok = _simTestHarness->Run(5s); + auto ok = _simTestHarness->Run(500s); ASSERT_TRUE(ok) << "SimTestHarness should terminate without timeout"; const size_t numSimulatedFlexrayControllers = diff --git a/SilKit/source/config/ParticipantConfiguration.hpp b/SilKit/source/config/ParticipantConfiguration.hpp index e1322cb46..f8b20b557 100644 --- a/SilKit/source/config/ParticipantConfiguration.hpp +++ b/SilKit/source/config/ParticipantConfiguration.hpp @@ -379,6 +379,8 @@ struct ParticipantConfiguration : public IParticipantConfiguration Includes includes; Middleware middleware; Experimental experimental; + // experimental synchronization points + bool enableSynchronizationPoints{false}; }; bool operator==(const CanController& lhs, const CanController& rhs); diff --git a/SilKit/source/config/ParticipantConfigurationFromXImpl.cpp b/SilKit/source/config/ParticipantConfigurationFromXImpl.cpp index d2e3f7c20..b545fb372 100644 --- a/SilKit/source/config/ParticipantConfigurationFromXImpl.cpp +++ b/SilKit/source/config/ParticipantConfigurationFromXImpl.cpp @@ -96,6 +96,7 @@ struct ConfigIncludeData std::map rpcClientCache; std::map traceSinkCache; std::map traceSourceCache; + bool enableSynchronizationPoints; }; @@ -550,6 +551,8 @@ void MergeExperimentalCache(const ExperimentalCache& cache, Experimental& experi auto MergeConfigs(ConfigIncludeData& configIncludeData) -> SilKit::Config::ParticipantConfiguration { SilKit::Config::ParticipantConfiguration config; + config.enableSynchronizationPoints = configIncludeData.enableSynchronizationPoints; + for (const auto& include : configIncludeData.configBuffer) { // Merge all vectors first! @@ -666,6 +669,8 @@ auto PaticipantConfigurationWithIncludes(const std::string& text, struct ConfigI throw SilKit::ConfigurationError{fmt::format("Unknown schema version '{}' found in participant configuration!", configuration.schemaVersion)}; } + configData.enableSynchronizationPoints = configuration.enableSynchronizationPoints; + configData.configBuffer.push_back(ConfigInclude("root", configuration)); AppendToSearchPaths(configuration, configData); diff --git a/SilKit/source/config/YamlReader.cpp b/SilKit/source/config/YamlReader.cpp index 6a796eb39..766e5bd54 100644 --- a/SilKit/source/config/YamlReader.cpp +++ b/SilKit/source/config/YamlReader.cpp @@ -486,6 +486,9 @@ void YamlReader::Read(SilKit::Config::ParticipantConfiguration& obj) OptionalRead(obj.middleware, "Middleware"); OptionalRead(obj.includes, "Includes"); OptionalRead(obj.experimental, "Experimental"); + + // design proposal + OptionalRead(obj.enableSynchronizationPoints, "EnableSynchronizationPoints"); } void YamlReader::Read(SilKit::Config::HealthCheck& obj) diff --git a/SilKit/source/config/YamlWriter.cpp b/SilKit/source/config/YamlWriter.cpp index 1bcadc815..cb374c49e 100644 --- a/SilKit/source/config/YamlWriter.cpp +++ b/SilKit/source/config/YamlWriter.cpp @@ -603,6 +603,9 @@ void YamlWriter::Write(const SilKit::Config::ParticipantConfiguration& obj) NonDefaultWrite(obj.middleware, "Middleware", defaultObj.middleware); NonDefaultWrite(obj.includes, "Includes", defaultObj.includes); NonDefaultWrite(obj.experimental, "Experimental", defaultObj.experimental); + + //design proposal + NonDefaultWrite(obj.enableSynchronizationPoints, "EnableSynchronizationPoints", defaultObj.enableSynchronizationPoints); } void YamlWriter::Write(const SilKit::Config::HealthCheck& obj) diff --git a/SilKit/source/core/internal/IParticipantInternal.hpp b/SilKit/source/core/internal/IParticipantInternal.hpp index 3da426d3d..1c7b9e7cb 100644 --- a/SilKit/source/core/internal/IParticipantInternal.hpp +++ b/SilKit/source/core/internal/IParticipantInternal.hpp @@ -330,6 +330,8 @@ class IParticipantInternal : public IParticipant virtual auto GetMetricsProcessor() -> IMetricsProcessor* = 0; virtual auto GetMetricsSender() -> IMetricsSender* = 0; + + virtual auto GetConfiguration() -> const Config::ParticipantConfiguration& = 0; }; } // namespace Core diff --git a/SilKit/source/core/internal/OrchestrationDatatypes.hpp b/SilKit/source/core/internal/OrchestrationDatatypes.hpp index 109dc8340..be302f337 100644 --- a/SilKit/source/core/internal/OrchestrationDatatypes.hpp +++ b/SilKit/source/core/internal/OrchestrationDatatypes.hpp @@ -15,12 +15,26 @@ namespace SilKit { namespace Services { namespace Orchestration { +enum class SynchronizationKind: uint8_t +{ + None = 0, + RequestSynchronization = 1, + AcknowledgeSynchronization = 2, +}; + struct NextSimTask { std::chrono::nanoseconds timePoint{0}; std::chrono::nanoseconds duration{0}; + SynchronizationKind synchronizationKind{SynchronizationKind::None}; + uint64_t serialNumber{0}; }; +inline auto operator==(const NextSimTask& lhs, const NextSimTask& rhs) +{ + return lhs.duration == rhs.duration && lhs.timePoint == rhs.timePoint; +} + //! System-wide command for the simulation flow. struct SystemCommand { diff --git a/SilKit/source/core/internal/string_utils_sync.hpp b/SilKit/source/core/internal/string_utils_sync.hpp index fe77c2281..f37f611a5 100644 --- a/SilKit/source/core/internal/string_utils_sync.hpp +++ b/SilKit/source/core/internal/string_utils_sync.hpp @@ -13,6 +13,7 @@ namespace Services { namespace Orchestration { inline std::string to_string(const NextSimTask& nextTask); +inline std::string to_string(SynchronizationKind kind); inline std::string to_string(SystemCommand::Kind command); inline std::string to_string(const SystemCommand& command); @@ -31,11 +32,26 @@ std::string to_string(const NextSimTask& nextTask) return outStream.str(); } +std::string to_string(SynchronizationKind kind) +{ + switch (kind) + { + case SynchronizationKind::None: + return "None"; + case SynchronizationKind::RequestSynchronization: + return "RequestSynchronization"; + case SynchronizationKind::AcknowledgeSynchronization: + return "AcknowledgeSynchronization"; + default: + return {}; + } +} std::ostream& operator<<(std::ostream& out, const NextSimTask& nextTask) { auto tp = std::chrono::duration_cast>(nextTask.timePoint); auto duration = std::chrono::duration_cast>(nextTask.duration); - out << "Orchestration::NextSimTask{tp=" << tp.count() << "ms, duration=" << duration.count() << "ms}"; + out << "NextSimTask{tp=" << tp.count() << "ms, duration=" << duration.count() + << "ms, serial=" << nextTask.serialNumber << ", kind=" << to_string(nextTask.synchronizationKind) << "}"; return out; } diff --git a/SilKit/source/core/internal/traits/SilKitMsgTraits.hpp b/SilKit/source/core/internal/traits/SilKitMsgTraits.hpp index f04e83f31..8c586482c 100644 --- a/SilKit/source/core/internal/traits/SilKitMsgTraits.hpp +++ b/SilKit/source/core/internal/traits/SilKitMsgTraits.hpp @@ -14,6 +14,8 @@ namespace Core { // ================================================================== // Trait which checks that '.timestamp' works // ================================================================== +template +using RemoveCvRef = std::remove_cv_t>; template struct HasTimestamp : std::false_type @@ -58,6 +60,15 @@ struct SilKitMsgTraitForbidSelfDelivery } }; +template +struct SilKitMsgTraitIsSynchronizationPoint +{ + static constexpr bool IsSynchronizationPoint() + { + return false; + } +}; + // The final message traits template struct SilKitMsgTraits @@ -67,6 +78,7 @@ struct SilKitMsgTraits , SilKitMsgTraitVersion , SilKitMsgTraitSerdesName , SilKitMsgTraitForbidSelfDelivery + , SilKitMsgTraitIsSynchronizationPoint { }; @@ -110,6 +122,16 @@ struct SilKitMsgTraits } \ } +#define DefineSilKitMsgTrait_IsSynchronizationPoint(Namespace, MsgName) \ + template <> \ + struct SilKitMsgTraitIsSynchronizationPoint \ + { \ + static constexpr bool IsSynchronizationPoint() \ + { \ + return true; \ + } \ + } + DefineSilKitMsgTrait_TypeName(SilKit::Services::Logging, LogMsg); DefineSilKitMsgTrait_TypeName(VSilKit, MetricsUpdate); DefineSilKitMsgTrait_TypeName(SilKit::Services::Orchestration, SystemCommand); @@ -164,5 +186,9 @@ DefineSilKitMsgTrait_EnforceSelfDelivery(SilKit::Services::Lin, LinSendFrameHead // Messages with forbidden self delivery DefineSilKitMsgTrait_ForbidSelfDelivery(SilKit::Services::Orchestration, SystemCommand); +// Messages which are Synchronization Points +DefineSilKitMsgTrait_IsSynchronizationPoint(SilKit::Services::Flexray, FlexrayCycleStartEvent); +DefineSilKitMsgTrait_IsSynchronizationPoint(SilKit::Services::PubSub, WireDataMessageEvent); //for testing + } // namespace Core } // namespace SilKit diff --git a/SilKit/source/core/mock/participant/MockParticipant.hpp b/SilKit/source/core/mock/participant/MockParticipant.hpp index 611a852e0..b962f6f7e 100644 --- a/SilKit/source/core/mock/participant/MockParticipant.hpp +++ b/SilKit/source/core/mock/participant/MockParticipant.hpp @@ -242,7 +242,7 @@ class DummyMetricsManager : public IMetricsManager std::unordered_map _attributes; }; -class DummyParticipant : public IParticipantInternal +class DummyParticipant: public IParticipantInternal { public: DummyParticipant() @@ -250,6 +250,7 @@ class DummyParticipant : public IParticipantInternal ON_CALL(mockLifecycleService, GetTimeSyncService).WillByDefault(testing::Return(&mockTimeSyncService)); ON_CALL(mockLifecycleService, CreateTimeSyncService).WillByDefault(testing::Return(&mockTimeSyncService)); ON_CALL(logger, GetLogLevel()).WillByDefault(testing::Return(Services::Logging::Level::Debug)); + ON_CALL(*this, GetConfiguration()).WillByDefault(testing::ReturnRef(_participantConfiguration)); } auto CreateCanController(const std::string& /*canonicalName*/, @@ -720,6 +721,8 @@ class DummyParticipant : public IParticipantInternal return nullptr; } + MOCK_METHOD(const Config::ParticipantConfiguration&, GetConfiguration, (), (override)); + const std::string _name = "MockParticipant"; const std::string _registryUri = "silkit://mock.participant.silkit:0"; testing::NiceMock logger; @@ -733,6 +736,8 @@ class DummyParticipant : public IParticipantInternal MockParticipantReplies mockParticipantReplies; DummyNetworkSimulator mockNetworkSimulator; DummyMetricsManager mockMetricsManager; + Config::ParticipantConfiguration _participantConfiguration; + }; // ================================================================================ diff --git a/SilKit/source/core/participant/Participant.hpp b/SilKit/source/core/participant/Participant.hpp index 8f864b597..026ea5990 100644 --- a/SilKit/source/core/participant/Participant.hpp +++ b/SilKit/source/core/participant/Participant.hpp @@ -81,7 +81,7 @@ namespace SilKit { namespace Core { template -class Participant final : public IParticipantInternal +class Participant final: public IParticipantInternal { public: // ---------------------------------------- @@ -440,6 +440,11 @@ class Participant final : public IParticipantInternal auto MakeTimerThread() -> std::unique_ptr; + auto GetConfiguration() -> const Config::ParticipantConfiguration& override; + + template + void HandleSynchronizationPoint(const IServiceEndpoint* service); + private: // ---------------------------------------- // private members diff --git a/SilKit/source/core/participant/Participant_impl.hpp b/SilKit/source/core/participant/Participant_impl.hpp index 0010dcc57..8007ec5cc 100644 --- a/SilKit/source/core/participant/Participant_impl.hpp +++ b/SilKit/source/core/participant/Participant_impl.hpp @@ -53,6 +53,7 @@ #include "Uuid.hpp" #include "Assert.hpp" #include "ExecutionEnvironment.hpp" +#include "traits/SilKitMsgTraits.hpp" #include "fmt/ranges.h" @@ -106,6 +107,7 @@ Participant::Participant(Config::ParticipantConfiguration par lm.SetKeyValue(Logging::Keys::participantName, GetParticipantName()); lm.SetKeyValue(Logging::Keys::registryUri, _participantConfig.middleware.registryUri); lm.SetKeyValue(Logging::Keys::silKitVersion, Version::StringImpl()); + lm.SetKeyValue("GitHash", Version::GitHashImpl()); lm.Dispatch(); } @@ -1323,14 +1325,37 @@ void Participant::SendMsg(const IServiceEndpoint* from, SendMsgImpl(from, std::move(msg)); } +template +template +void Participant::HandleSynchronizationPoint(const IServiceEndpoint* service) +{ + if constexpr (SilKitMsgTraits>::IsSynchronizationPoint()) + { + if (auto* lifecycle = static_cast(GetLifecycleService()); lifecycle) + { + if (auto* timesync = static_cast(lifecycle->GetTimeSyncService()); + timesync) + { + if(_participantConfig.enableSynchronizationPoints) + { + const auto& serdesName = SilKitMsgTraits>::SerdesName(); + const auto numReceivers = _connection.GetNumberOfRemoteReceivers(service, serdesName); + timesync->TriggerSynchronization(numReceivers); + } + } + } + } + +} + template template void Participant::SendMsgImpl(const IServiceEndpoint* from, SilKitMessageT&& msg) { TraceTx(GetLoggerInternal(), from, msg); _connection.SendMsg(from, std::forward(msg)); + HandleSynchronizationPoint(from); } - // Targeted messaging template void Participant::SendMsg(const IServiceEndpoint* from, const std::string& targetParticipantName, @@ -1633,6 +1658,7 @@ void Participant::SendMsgImpl(const IServiceEndpoint* from, c { TraceTx(GetLoggerInternal(), from, targetParticipantName, msg); _connection.SendMsg(from, targetParticipantName, std::forward(msg)); + HandleSynchronizationPoint(from); } @@ -2024,6 +2050,11 @@ auto Participant::MakeTimerThread() -> std::unique_ptrSubmitUpdates(); }); }); } +template +auto Participant::GetConfiguration() -> const Config::ParticipantConfiguration& +{ + return _participantConfig; +} } // namespace Core diff --git a/SilKit/source/core/participant/Test_Participant.cpp b/SilKit/source/core/participant/Test_Participant.cpp index 6f329d08c..28aa3cf0d 100644 --- a/SilKit/source/core/participant/Test_Participant.cpp +++ b/SilKit/source/core/participant/Test_Participant.cpp @@ -8,6 +8,20 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +#include "WireDataMessages.hpp" +#include "WireCanMessages.hpp" +#include "WireEthernetMessages.hpp" +#include "WireLinMessages.hpp" +#include "WireFlexrayMessages.hpp" +#include "WireRpcMessages.hpp" +#include "LoggingDatatypesInternal.hpp" +#include "OrchestrationDatatypes.hpp" +#include "ServiceDatatypes.hpp" +#include "RequestReplyDatatypes.hpp" +#include "MetricsDatatypes.hpp" + +#include "traits/SilKitMsgTraits.hpp" + #include "NullConnectionParticipant.hpp" #include "CanController.hpp" #include "ConfigurationTestUtils.hpp" @@ -24,6 +38,13 @@ class Test_Participant : public testing::Test Test_Participant() {} }; +TEST(Test_Traits, ensure_traits) +{ + EXPECT_EQ(SilKitMsgTraits::TypeName(), + std::string{"SilKit::Services::PubSub::WireDataMessageEvent"}); + EXPECT_TRUE(SilKitMsgTraits::IsSynchronizationPoint()); +} + TEST_F(Test_Participant, throw_on_empty_participant_name) { EXPECT_THROW(CreateNullConnectionParticipantImpl(SilKit::Config::MakeEmptyParticipantConfigurationImpl(), ""), diff --git a/SilKit/source/core/vasio/VAsioConnection.hpp b/SilKit/source/core/vasio/VAsioConnection.hpp index 3dda909df..409074b4a 100644 --- a/SilKit/source/core/vasio/VAsioConnection.hpp +++ b/SilKit/source/core/vasio/VAsioConnection.hpp @@ -397,6 +397,14 @@ class VAsioConnection { const auto& key = from->GetServiceDescriptor().GetNetworkName(); + if constexpr (std::is_same_v>) + { + if(msg.duration == std::chrono::nanoseconds{0}) + { + (void)msg; //DEBUG break point here + } + } + auto& linkMap = std::get>>(_serviceToLinkMap); if (linkMap.count(key) < 1) { diff --git a/SilKit/source/services/orchestration/SyncSerdes.cpp b/SilKit/source/services/orchestration/SyncSerdes.cpp index 7bac2f4a8..ac1ce043c 100644 --- a/SilKit/source/services/orchestration/SyncSerdes.cpp +++ b/SilKit/source/services/orchestration/SyncSerdes.cpp @@ -12,13 +12,17 @@ namespace Orchestration { inline SilKit::Core::MessageBuffer& operator<<(SilKit::Core::MessageBuffer& buffer, const SilKit::Services::Orchestration::NextSimTask& task) { - buffer << task.timePoint << task.duration; + buffer << task.timePoint << task.duration << task.synchronizationKind << task.serialNumber; return buffer; } inline SilKit::Core::MessageBuffer& operator>>(SilKit::Core::MessageBuffer& buffer, SilKit::Services::Orchestration::NextSimTask& task) { buffer >> task.timePoint >> task.duration; + if(buffer.RemainingBytesLeft() > 0) + { + buffer >> task.synchronizationKind >> task.serialNumber; + } return buffer; } diff --git a/SilKit/source/services/orchestration/TimeConfiguration.cpp b/SilKit/source/services/orchestration/TimeConfiguration.cpp index 98acf56c3..cc1bd7ab5 100644 --- a/SilKit/source/services/orchestration/TimeConfiguration.cpp +++ b/SilKit/source/services/orchestration/TimeConfiguration.cpp @@ -77,7 +77,7 @@ void TimeConfiguration::OnReceiveNextSimStep(const std::string& participantName, { Logging::Error( _logger, - "Chonology error: Received NextSimTask from participant \'{}\' with lower timePoint {} than last " + "Chronology error: Received NextSimTask from participant '{}' with lower timePoint {} than last " "known timePoint {}", participantName, nextStep.timePoint.count(), itOtherNextTask->second.timePoint.count()); } diff --git a/SilKit/source/services/orchestration/TimeConfiguration.hpp b/SilKit/source/services/orchestration/TimeConfiguration.hpp index 59157c963..da5bd8f4b 100755 --- a/SilKit/source/services/orchestration/TimeConfiguration.hpp +++ b/SilKit/source/services/orchestration/TimeConfiguration.hpp @@ -35,6 +35,14 @@ class TimeConfiguration void Initialize(); bool IsBlocking() const; + auto GetNumberOfOtherParticipants() const -> size_t + { + Lock lock{_mx}; + return _otherNextTasks.size(); + } + + + bool ShouldResendNextSimStep(); // Returns true (only once) in the step the actual hop-on happened diff --git a/SilKit/source/services/orchestration/TimeSyncService.cpp b/SilKit/source/services/orchestration/TimeSyncService.cpp index d21b9888e..329ba012e 100644 --- a/SilKit/source/services/orchestration/TimeSyncService.cpp +++ b/SilKit/source/services/orchestration/TimeSyncService.cpp @@ -5,6 +5,9 @@ #include #include #include +#include +#include +#include #include "silkit/services/orchestration/string_utils.hpp" #include "silkit/services/orchestration/ISystemMonitor.hpp" @@ -30,6 +33,18 @@ auto GetDefaultTimerResolution() -> std::chrono::nanoseconds return 1ms; } #endif + +auto GetRandomSerialNumber() -> uint64_t +{ + thread_local static std::mt19937_64 gen(std::random_device{}()); + thread_local static std::uniform_int_distribution dist(1, std::numeric_limits::max()); + return dist(gen); +} +auto GetCurrentTimeMillis() +{ + return std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()); +} } // namespace namespace SilKit { @@ -46,6 +61,7 @@ struct ITimeSyncPolicy virtual auto IsExecutingSimStep() -> bool = 0; virtual void ReceiveNextSimTask(const Core::IServiceEndpoint* from, const NextSimTask& task) = 0; virtual void ProcessSimulationTimeUpdate() = 0; + virtual void TriggerSynchronization(size_t numberOfRemoteReceivers) = 0; }; //! brief Synchronization policy for unsynchronized participants @@ -56,29 +72,29 @@ struct UnsynchronizedPolicy : public ITimeSyncPolicy void Initialize() override {} void RequestNextStep() override {} void SetSimStepCompleted() override {} - auto IsExecutingSimStep() -> bool override + bool IsExecutingSimStep() override { return false; } void ReceiveNextSimTask(const Core::IServiceEndpoint* /*from*/, const NextSimTask& /*task*/) override {} - void ProcessSimulationTimeUpdate() override {}; + void ProcessSimulationTimeUpdate() override {} + void TriggerSynchronization(size_t /*numberOfRemoteReceivers*/) override {} }; //! brief Synchronization policy of the VAsio middleware struct SynchronizedPolicy : public ITimeSyncPolicy { public: - SynchronizedPolicy(TimeSyncService& controller, Core::IParticipantInternal* participant, - TimeConfiguration* configuration) - : _controller(controller) - , _participant(participant) - , _configuration(configuration) + SynchronizedPolicy(TimeSyncService& controller, Core::IParticipantInternal* participant) + : _controller{controller} + , _participant{participant} + , _enableSynchronizationPoints{participant->GetConfiguration().enableSynchronizationPoints} { } void Initialize() override { - _configuration->Initialize(); + _controller.GetTimeConfiguration()->Initialize(); } void SetSimStepCompleted() override @@ -99,10 +115,10 @@ struct SynchronizedPolicy : public ITimeSyncPolicy && !_controller.PauseRequested()) { if (_lastSentNextSimTask - != _configuration->NextSimStep().timePoint) // Prevent sending same step more than once + != _controller.GetTimeConfiguration()->NextSimStep().timePoint) // Prevent sending same step more than once { - _lastSentNextSimTask = _configuration->NextSimStep().timePoint; - _controller.SendMsg(_configuration->NextSimStep()); + _lastSentNextSimTask = _controller.GetTimeConfiguration()->NextSimStep().timePoint; + _controller.SendMsg(_controller.GetTimeConfiguration()->NextSimStep()); } // Bootstrap checked execution, in case there is no other participant. // Else, checked execution is initiated when we receive their NextSimTask messages. @@ -110,9 +126,127 @@ struct SynchronizedPolicy : public ITimeSyncPolicy } } + void TriggerSynchronization(size_t numberOfRemoteReceivers) override + { + if(numberOfRemoteReceivers < 1) + { + return; + } + + const auto serialNumber = GetRandomSerialNumber(); + auto&& transaction = _transactions[serialNumber]; + + transaction.isInitialized = true; + transaction.requester = &_controller; + transaction.synchronizationRequested = true; + transaction.numberOfAcknowledgesReceived = 0; + transaction.numberOfExpectedAcknowledges = numberOfRemoteReceivers; + transaction.wallclockStartTime = GetCurrentTimeMillis(); + transaction.serialNumber = serialNumber; + + + auto&& currentStep = _controller.GetTimeConfiguration()->CurrentSimStep(); + const NextSimTask zeroStep{currentStep.timePoint, 0ns, SynchronizationKind::RequestSynchronization, + serialNumber}; + _controller.SendMsg( zeroStep); + Logging::Info(_participant->GetLogger(), "Started transaction {}", serialNumber); + } + void ReceiveNextSimTask(const Core::IServiceEndpoint* from, const NextSimTask& task) override { - _configuration->OnReceiveNextSimStep(from->GetServiceDescriptor().GetParticipantName(), task); + auto isTransaction = [](auto&& simTask) { + return simTask.serialNumber > 0 && simTask.synchronizationKind != SynchronizationKind::None; + }; + + if (_enableSynchronizationPoints) + { + if (isTransaction(task)) + { + switch (task.synchronizationKind) + { + case SynchronizationKind::RequestSynchronization: + //if (transaction.requester != &_controller) + { + auto&& currentStep = _controller.GetTimeConfiguration()->CurrentSimStep(); + // trigger local callback + _controller.ExecuteSimStep(currentStep.timePoint, 0ns); + // send acknowledge for synchronization request + _controller.SendMsg(NextSimTask{currentStep.timePoint, 0ns, + SynchronizationKind::AcknowledgeSynchronization, + task.serialNumber}); + //Logging::Info(_participant->GetLogger(), "Acknowledging transaction {}", task.serialNumber); + return; + } + //else + //{ + //Logging::Info(_participant->GetLogger(), "Doing nothing transaction {}", task.serialNumber); + // return; // do nothing until acknowledges arrive + //} + break; + case SynchronizationKind::AcknowledgeSynchronization: + //if (transaction.synchronizationRequested) + if (_transactions.count(task.serialNumber) > 0) + { + auto&& transaction = _transactions[task.serialNumber]; + transaction.numberOfAcknowledgesReceived++; + if (transaction.numberOfAcknowledgesReceived + >= transaction.numberOfExpectedAcknowledges) + { + // All participants have acknowledged the synchronization request, we can continue with the next step + auto duration = GetCurrentTimeMillis() - transaction.wallclockStartTime; + Logging::Info(_participant->GetLogger(), "Completed transaction {} duration {}", task.serialNumber, duration); + transaction = {}; // reset state and run NextSimTask as usual + _transactions.erase(task.serialNumber); + } + else + { + //Logging::Info(_participant->GetLogger(), "Waiting for more acks transaction {}", task.serialNumber); + return; // wait until all acknowledges are received before triggering the next step + } + } + break; + default: + break; + } + } + else + { + // cache a non-transaction NextSimTask until all transactions are completed, then process it as usual + _deferredSimTasks[from->GetServiceDescriptor().GetParticipantName()] = task; + } + } + if (_enableSynchronizationPoints && _transactions.size() > 0) + { + //Logging::Info(_participant->GetLogger(), "Not advancing time because of {} outstanding transactions", + // _transactions.size()); + return; + } + + if (isTransaction(task)) + { + if (!_enableSynchronizationPoints) + { + Logging::Info(_participant->GetLogger(), "Synchronization Points disabled, ignoring {}", task); + return; + } + for(auto&& [participantName, deferredSimTask]: _deferredSimTasks) + { + Logging::Info(_participant->GetLogger(), "No transactions active, processing {}", deferredSimTask); + // process the cached non-transaction NextSimTask if it exists + _controller.GetTimeConfiguration()->OnReceiveNextSimStep(participantName, deferredSimTask); + } + _deferredSimTasks.clear(); + } + else + { + // normal operation + if (task.serialNumber > 0 || task.synchronizationKind != SynchronizationKind::None) + { + throw SilKitError{"Transaction error in NextSimTask"}; + } + _controller.GetTimeConfiguration()->OnReceiveNextSimStep(from->GetServiceDescriptor().GetParticipantName(), + task); + } switch (_controller.State()) { @@ -164,10 +298,25 @@ struct SynchronizedPolicy : public ITimeSyncPolicy } } + struct ActiveTransaction + { + bool isInitialized{false}; + uint64_t serialNumber{0}; + std::chrono::milliseconds wallclockStartTime{0}; + size_t numberOfAcknowledgesReceived{0}; + size_t numberOfExpectedAcknowledges{0}; + Core::IServiceEndpoint* requester{nullptr}; + bool synchronizationRequested{false}; + }; + + std::unordered_map _transactions; + std::unordered_map _deferredSimTasks; + + private: bool IsSimStepSync() const { - return _configuration->IsBlocking(); + return _controller.GetTimeConfiguration()->IsBlocking(); } bool IsTimeAdvancePossible() @@ -192,7 +341,7 @@ struct SynchronizedPolicy : public ITimeSyncPolicy return false; } - if (_configuration->OtherParticipantHasLowerTimepoint()) + if (_controller.GetTimeConfiguration()->OtherParticipantHasLowerTimepoint()) { return false; } @@ -200,7 +349,7 @@ struct SynchronizedPolicy : public ITimeSyncPolicy // With real-time sync, the time advance is not possible if less then current real-time. if (_controller.IsCoupledToWallClock()) { - if (_configuration->NextSimStep().timePoint > _controller.GetCurrentWallClockSyncPoint()) + if (_controller.GetTimeConfiguration()->NextSimStep().timePoint > _controller.GetCurrentWallClockSyncPoint()) { return false; } @@ -254,7 +403,7 @@ struct SynchronizedPolicy : public ITimeSyncPolicy if (!_hopOnEvaluated) { _hopOnEvaluated = true; - if (_configuration->IsHopOn()) + if (_controller.GetTimeConfiguration()->IsHopOn()) { if (_controller.AbortHopOnForCoordinatedParticipants()) { @@ -265,14 +414,14 @@ struct SynchronizedPolicy : public ITimeSyncPolicy if (_controller.IsCoupledToWallClock()) { // Start the wall clock coupling thread, possibly with an offset in case of an hop-on. - _controller.StartWallClockCouplingThread(_configuration->NextSimStep().timePoint); + _controller.StartWallClockCouplingThread(_controller.GetTimeConfiguration()->NextSimStep().timePoint); } } // update the current and next sim. step timestamps - _configuration->AdvanceTimeStep(); + _controller.GetTimeConfiguration()->AdvanceTimeStep(); // Execute the simulation step callback with the current simulation time - auto currentStep = _configuration->CurrentSimStep(); + auto currentStep = _controller.GetTimeConfiguration()->CurrentSimStep(); _controller.ExecuteSimStep(currentStep.timePoint, currentStep.duration); } } @@ -280,9 +429,9 @@ struct SynchronizedPolicy : public ITimeSyncPolicy std::atomic _isExecutingSimStep{false}; TimeSyncService& _controller; std::chrono::nanoseconds _lastSentNextSimTask{-1ns}; - Core::IParticipantInternal* _participant; - TimeConfiguration* _configuration; - bool _hopOnEvaluated = false; + Core::IParticipantInternal* _participant{nullptr}; + bool _hopOnEvaluated{false}; + bool _enableSynchronizationPoints{false}; }; TimeSyncService::TimeSyncService(Core::IParticipantInternal* participant, ITimeProvider* timeProvider, @@ -470,7 +619,7 @@ bool TimeSyncService::SetupTimeSyncPolicy(bool isSynchronizingVirtualTime) _timeSyncConfigured = true; if (isSynchronizingVirtualTime) { - _timeSyncPolicy = std::make_shared(*this, _participant, &_timeConfiguration); + _timeSyncPolicy = std::make_shared(*this, _participant); } else { @@ -813,6 +962,13 @@ void TimeSyncService::InvokeOtherSimulationStepsCompletedHandlers() _otherSimulationStepsCompletedHandlers.InvokeAll(); } +void TimeSyncService::TriggerSynchronization(size_t numberOfRemoteReceivers) +{ + GetTimeSyncPolicy()->TriggerSynchronization(numberOfRemoteReceivers); +} + + + void TimeSyncService::StopWallClockCouplingThread() { if (_wallClockCouplingThreadRunning) diff --git a/SilKit/source/services/orchestration/TimeSyncService.hpp b/SilKit/source/services/orchestration/TimeSyncService.hpp index 9cf516c08..d35efb44b 100644 --- a/SilKit/source/services/orchestration/TimeSyncService.hpp +++ b/SilKit/source/services/orchestration/TimeSyncService.hpp @@ -99,6 +99,10 @@ class TimeSyncService void RemoveOtherSimulationStepsCompletedHandler(HandlerId handlerId); void InvokeOtherSimulationStepsCompletedHandlers(); + + // synchronization point design proposal + void TriggerSynchronization(size_t numberOfRemoteReceivers); + private: // ---------------------------------------- // private methods