diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h index 141e271b2125af3960fbd08a18b9facc38efd33e..409399585c5a45aeb7364351425c20674b4a6d39 100755 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter.h @@ -34,35 +34,42 @@ enum IdType { UUID, UDID, }; -class Semaphore { -public: - explicit Semaphore(unsigned int resCount) : count(resCount), data(-1) {} - ~Semaphore() {} +template +class BlockData { +public: + explicit BlockData() {} + ~BlockData() {} public: - int Wait() + void SetValue(T &data) { - std::unique_lock uniqueLock(mutex); - --count; - while (count < 0) { - cv.wait(uniqueLock); - } + std::lock_guard lock(mutex_); + data_ = data; + isSet_ = true; + cv_.notify_one(); + } + + T GetValue() + { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this]() { return isSet_; }); + T data = data_; + cv_.notify_one(); return data; } - void Signal(const int &sendData) + + void Clear() { - std::lock_guard lg(mutex); - data = sendData; - if (++count < 1) { - cv.notify_one(); - } + std::lock_guard lock(mutex_); + isSet_ = false; + cv_.notify_one(); } private: - int count; - int data; - std::mutex mutex; - std::condition_variable cv; + bool isSet_ = false; + T data_; + std::mutex mutex_; + std::condition_variable cv_; }; class SoftBusAdapter { @@ -124,14 +131,13 @@ public: void NotifyDataListeners(const uint8_t *ptr, const int size, const std::string &deviceId, const PipeInfo &pipeInfo); - int WaitSessionOpen(); - - void NotifySessionOpen(const int &state); + int32_t GetSessionStatus(int32_t sessionId); - int GetOpenSessionId(); + void OnSessionOpen(int32_t sessionId, int32_t status); - void SetOpenSessionId(const int &sessionId); + void OnSessionClose(int32_t sessionId); private: + std::shared_ptr> GetSemaphore (int32_t sessinId); mutable std::mutex networkMutex_ {}; mutable std::map> networkId2UuidUdid_ {}; DeviceInfo localInfo_ {}; @@ -145,11 +151,8 @@ private: bool flag_ = true; // only for br flag INodeStateCb nodeStateCb_ {}; ISessionListener sessionListener_ {}; - std::unique_ptr semaphore_ {}; - std::mutex notifyFlagMutex_ {}; - bool notifyFlag_ = false; - std::mutex openSessionIdMutex_ {}; - int openSessionId_ = -1; + std::mutex statusMutex_ {}; + std::map>> sessionsStatus_; }; } // namespace AppDistributedKv } // namespace OHOS diff --git a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp index 529866b57bde657da6a149181f8a19ef0bd64dcf..758bcb12198d03a80fc2909d51351d58311aea22 100755 --- a/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp +++ b/services/distributeddataservice/adapter/communicator/src/softbus_adapter_standard.cpp @@ -127,8 +127,6 @@ SoftBusAdapter::SoftBusAdapter() sessionListener_.OnSessionClosed = AppDataListenerWrap::OnSessionClosed; sessionListener_.OnBytesReceived = AppDataListenerWrap::OnBytesReceived; sessionListener_.OnMessageReceived = AppDataListenerWrap::OnMessageReceived; - - semaphore_ = std::make_unique(0); } SoftBusAdapter::~SoftBusAdapter() @@ -510,13 +508,8 @@ Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &device pipeInfo.pipeId.c_str(), info.msgType, sessionId); return Status::CREATE_SESSION_ERROR; } - SetOpenSessionId(sessionId); - int state = WaitSessionOpen(); - { - lock_guard lock(notifyFlagMutex_); - notifyFlag_ = false; - } - ZLOGD("Waited for notification, state:%{public}d", state); + int state = GetSessionStatus(sessionId); + ZLOGD("waited for notification, state:%{public}d", state); if (state != SOFTBUS_OK) { ZLOGE("OpenSession callback result error"); return Status::CREATE_SESSION_ERROR; @@ -531,6 +524,37 @@ Status SoftBusAdapter::SendData(const PipeInfo &pipeInfo, const DeviceId &device return Status::SUCCESS; } +int32_t SoftBusAdapter::GetSessionStatus(int32_t sessionId) +{ + auto semaphore = GetSemaphore(sessionId); + return semaphore->GetValue(); +} + +void SoftBusAdapter::OnSessionOpen(int32_t sessionId, int32_t status) +{ + auto semaphore = GetSemaphore(sessionId); + semaphore->SetValue(status); +} + +void SoftBusAdapter::OnSessionClose(int32_t sessionId) +{ + lock_guard lock(statusMutex_); + auto it = sessionsStatus_.find(sessionId); + if (it != sessionsStatus_.end()) { + it->second->Clear(); + sessionsStatus_.erase(it); + } +} + +std::shared_ptr> SoftBusAdapter::GetSemaphore(int32_t sessionId) +{ + lock_guard lock(statusMutex_); + if (sessionsStatus_.find(sessionId) == sessionsStatus_.end()) { + sessionsStatus_.emplace(sessionId, std::make_shared>()); + } + return sessionsStatus_[sessionId]; +} + bool SoftBusAdapter::IsSameStartedOnPeer(const struct PipeInfo &pipeInfo, __attribute__((unused))const struct DeviceId &peer) { @@ -605,37 +629,6 @@ void SoftBusAdapter::NotifyDataListeners(const uint8_t *ptr, const int size, con ZLOGW("no listener %{public}s.", pipeInfo.pipeId.c_str()); } -int SoftBusAdapter::WaitSessionOpen() -{ - { - lock_guard lock(notifyFlagMutex_); - if (notifyFlag_) { - ZLOGD("already notified return"); - return 0; - } - } - return semaphore_->Wait(); -} - -void SoftBusAdapter::NotifySessionOpen(const int &state) -{ - semaphore_->Signal(state); - lock_guard lock(notifyFlagMutex_); - notifyFlag_ = true; -} - -int SoftBusAdapter::GetOpenSessionId() -{ - lock_guard lock(openSessionIdMutex_); - return openSessionId_; -} - -void SoftBusAdapter::SetOpenSessionId(const int &sessionId) -{ - lock_guard lock(openSessionIdMutex_); - openSessionId_ = sessionId; -} - void AppDataListenerWrap::SetDataHandler(SoftBusAdapter *handler) { ZLOGI("begin"); @@ -649,9 +642,7 @@ int AppDataListenerWrap::OnSessionOpened(int sessionId, int result) char peerSessionName[SESSION_NAME_SIZE_MAX] = ""; char peerDevId[DEVICE_ID_SIZE_MAX] = ""; - if (sessionId == softBusAdapter_->GetOpenSessionId()) { - softBusAdapter_->NotifySessionOpen(result); - } + softBusAdapter_->OnSessionOpen(sessionId, result); if (result != SOFTBUS_OK) { ZLOGW("session %{public}d open failed, result:%{public}d.", sessionId, result); return result; @@ -690,6 +681,7 @@ void AppDataListenerWrap::OnSessionClosed(int sessionId) char peerSessionName[SESSION_NAME_SIZE_MAX] = ""; char peerDevId[DEVICE_ID_SIZE_MAX] = ""; + softBusAdapter_->OnSessionClose(sessionId); int ret = GetMySessionName(sessionId, mySessionName, sizeof(mySessionName)); if (ret != SOFTBUS_OK) { ZLOGW("get my session name failed, session id is %{public}d.", sessionId);