diff --git a/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_single.h b/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_single.h index 20a1c15be64a8e87916e4f28e75b70280d4a6d5b..e2d1b690bfc4c27b9e6bb145a968f9f731596fe9 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_single.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/include/ikvstore_single.h @@ -56,6 +56,9 @@ public: SETCAPABILITYENABLED, SETCAPABILITYRANGE, SETSECURITLEVEL, + SYNC_WITH_CONDITION, + SUBSCRIBE_WITH_QUERY, + UNSUBSCRIBE_WITH_QUERY, SINGLE_CMD_LAST, }; DECLARE_INTERFACE_DESCRIPTOR(u"OHOS.DistributedKv.ISingleKvStore") @@ -72,8 +75,7 @@ public: std::function)> callback) = 0; virtual Status CloseResultSet(sptr resultSet) = 0; virtual Status GetCountWithQuery(const std::string &query, int &result) = 0; - virtual Status Sync(const std::vector &deviceIdList, const SyncMode &mode, - uint32_t allowedDelayMs) = 0; + virtual Status Sync(const std::vector &deviceIdList, SyncMode mode, uint32_t allowedDelayMs) = 0; virtual Status RemoveDeviceData(const std::string &device) = 0; virtual Status RegisterSyncCallback(sptr callback) = 0; virtual Status UnRegisterSyncCallback() = 0; @@ -87,6 +89,9 @@ public: virtual Status SetCapabilityRange(const std::vector &localLabels, const std::vector &remoteSupportLabels) = 0; virtual Status GetSecurityLevel(SecurityLevel &securityLevel) = 0; + virtual Status Sync(const std::vector &deviceIdList, SyncMode mode, const std::string &query) = 0; + virtual Status SubscribeWithQuery(const std::vector &deviceIdList, const std::string &query) = 0; + virtual Status UnSubscribeWithQuery(const std::vector &deviceIdList, const std::string &query) = 0; }; class SingleKvStoreStub : public IRemoteStub { @@ -94,32 +99,35 @@ public: virtual int OnRemoteRequest(uint32_t code, MessageParcel &data, MessageParcel &reply, MessageOption &option) override; private: - int PutOnRemote(MessageParcel& data, MessageParcel& reply); - int DeleteOnRemote(MessageParcel& data, MessageParcel& reply); - int GetOnRemote(MessageParcel& data, MessageParcel& reply); - int SubscribeKvStoreOnRemote(MessageParcel& data, MessageParcel& reply); - int UnSubscribeKvStoreOnRemote(MessageParcel& data, MessageParcel& reply); - int GetEntriesOnRemote(MessageParcel& data, MessageParcel& reply); - int GetEntriesWithQueryOnRemote(MessageParcel& data, MessageParcel& reply); - int SyncOnRemote(MessageParcel& data, MessageParcel& reply); - int GetResultSetOnRemote(MessageParcel& data, MessageParcel& reply); - int GetResultSetWithQueryOnRemote(MessageParcel& data, MessageParcel& reply); - int GetCountWithQueryOnRemote(MessageParcel& data, MessageParcel& reply); - int CloseResultSetOnRemote(MessageParcel& data, MessageParcel& reply); - int RemoveDeviceDataOnRemote(MessageParcel& data, MessageParcel& reply); - int RegisterSyncCallbackOnRemote(MessageParcel& data, MessageParcel& reply); - int UnRegisterSyncCallbackOnRemote(MessageParcel& data, MessageParcel& reply); - int PutBatchOnRemote(MessageParcel& data, MessageParcel& reply); - int DeleteBatchOnRemote(MessageParcel& data, MessageParcel& reply); - int StartTransactionOnRemote(MessageParcel& data, MessageParcel& reply); - int CommitOnRemote(MessageParcel& data, MessageParcel& reply); - int RollbackOnRemote(MessageParcel& data, MessageParcel& reply); - int ControlOnRemote(MessageParcel& data, MessageParcel& reply); - int OnCapabilityEnableRequest(MessageParcel& data, MessageParcel& reply); - int OnCapabilityRangeRequest(MessageParcel& data, MessageParcel& reply); - int OnSecurityLevelRequest(MessageParcel& data, MessageParcel& reply); + int PutOnRemote(MessageParcel &data, MessageParcel &reply); + int DeleteOnRemote(MessageParcel &data, MessageParcel &reply); + int GetOnRemote(MessageParcel &data, MessageParcel &reply); + int SubscribeKvStoreOnRemote(MessageParcel &data, MessageParcel &reply); + int UnSubscribeKvStoreOnRemote(MessageParcel &data, MessageParcel &reply); + int GetEntriesOnRemote(MessageParcel &data, MessageParcel &reply); + int GetEntriesWithQueryOnRemote(MessageParcel &data, MessageParcel &reply); + int SyncOnRemote(MessageParcel &data, MessageParcel &reply); + int GetResultSetOnRemote(MessageParcel &data, MessageParcel &reply); + int GetResultSetWithQueryOnRemote(MessageParcel &data, MessageParcel &reply); + int GetCountWithQueryOnRemote(MessageParcel &data, MessageParcel &reply); + int CloseResultSetOnRemote(MessageParcel &data, MessageParcel &reply); + int RemoveDeviceDataOnRemote(MessageParcel &data, MessageParcel &reply); + int RegisterSyncCallbackOnRemote(MessageParcel &data, MessageParcel &reply); + int UnRegisterSyncCallbackOnRemote(MessageParcel &data, MessageParcel &reply); + int PutBatchOnRemote(MessageParcel &data, MessageParcel &reply); + int DeleteBatchOnRemote(MessageParcel &data, MessageParcel &reply); + int StartTransactionOnRemote(MessageParcel &data, MessageParcel &reply); + int CommitOnRemote(MessageParcel &data, MessageParcel &reply); + int RollbackOnRemote(MessageParcel &data, MessageParcel &reply); + int ControlOnRemote(MessageParcel &data, MessageParcel &reply); + int OnCapabilityEnableRequest(MessageParcel &data, MessageParcel &reply); + int OnCapabilityRangeRequest(MessageParcel &data, MessageParcel &reply); + int OnSecurityLevelRequest(MessageParcel &data, MessageParcel &reply); + int OnSyncRequest(MessageParcel &data, MessageParcel &reply); - int WriteEntriesParcelable(MessageParcel& reply, Status status, std::vector entries, int bufferSize); + int OnSubscribeWithQueryRequest(MessageParcel &data, MessageParcel &reply); + int OnUnSubscribeWithQueryRequest(MessageParcel &data, MessageParcel &reply); + int WriteEntriesParcelable(MessageParcel &reply, Status status, std::vector entries, int bufferSize); int GetTotalEntriesSize(std::vector entries); using RequestHandler = int(SingleKvStoreStub::*)(MessageParcel&, MessageParcel&); @@ -148,6 +156,9 @@ private: [SETCAPABILITYENABLED] = &SingleKvStoreStub::OnCapabilityEnableRequest, [SETCAPABILITYRANGE] = &SingleKvStoreStub::OnCapabilityRangeRequest, [SETSECURITLEVEL] = &SingleKvStoreStub::OnSecurityLevelRequest, + [SYNC_WITH_CONDITION] = &SingleKvStoreStub::OnSyncRequest, + [SUBSCRIBE_WITH_QUERY] = &SingleKvStoreStub::OnSubscribeWithQueryRequest, + [UNSUBSCRIBE_WITH_QUERY] = &SingleKvStoreStub::OnUnSubscribeWithQueryRequest, }; }; @@ -167,7 +178,8 @@ public: std::function)> callback); virtual Status CloseResultSet(sptr resultSet); virtual Status GetCountWithQuery(const std::string &query, int &result); - virtual Status Sync(const std::vector &deviceIdList, const SyncMode &mode, uint32_t allowedDelayMs); + virtual Status Sync(const std::vector &deviceIdList, SyncMode mode, uint32_t allowedDelayMs); + virtual Status Sync(const std::vector &deviceIdList, SyncMode mode, const std::string &query); virtual Status RemoveDeviceData(const std::string &device); virtual Status RegisterSyncCallback(sptr callback); virtual Status UnRegisterSyncCallback(); @@ -181,6 +193,8 @@ public: virtual Status SetCapabilityRange(const std::vector &localLabels, const std::vector &remoteSupportLabels); virtual Status GetSecurityLevel(SecurityLevel &securityLevel); + virtual Status SubscribeWithQuery(const std::vector &deviceIdList, const std::string &query); + virtual Status UnSubscribeWithQuery(const std::vector &deviceIdList, const std::string &query); private: static inline BrokerDelegator delegator_; }; diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp index 7d9c8f6685811d51cc80ea26d2ed403e316273e1..2a2b5440ae4ceb53c2dd809beef8d6e10938df63 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/ikvstore_single.cpp @@ -461,7 +461,7 @@ Status SingleKvStoreProxy::CloseResultSet(sptr resultSetPtr) return static_cast(reply.ReadInt32()); } -Status SingleKvStoreProxy::Sync(const std::vector &deviceIdList, const SyncMode &mode, +Status SingleKvStoreProxy::Sync(const std::vector &deviceIds, SyncMode mode, uint32_t allowedDelayMs) { MessageParcel data; @@ -471,7 +471,7 @@ Status SingleKvStoreProxy::Sync(const std::vector &deviceIdList, co return Status::IPC_ERROR; } MessageOption mo { MessageOption::TF_SYNC }; - if (!data.WriteStringVector(deviceIdList) || + if (!data.WriteStringVector(deviceIds) || !data.WriteInt32(static_cast(mode))) { ZLOGW("SendRequest write parcel failed."); return Status::IPC_ERROR; @@ -487,6 +487,31 @@ Status SingleKvStoreProxy::Sync(const std::vector &deviceIdList, co } return static_cast(reply.ReadInt32()); } +Status SingleKvStoreProxy::Sync(const std::vector &deviceIds, SyncMode mode, const std::string &query) +{ + MessageParcel data; + if (!data.WriteInterfaceToken(SingleKvStoreProxy::GetDescriptor())) { + ZLOGE("write descriptor failed"); + return Status::IPC_ERROR; + } + MessageOption mo { MessageOption::TF_SYNC }; + if (!data.WriteStringVector(deviceIds) || + !data.WriteInt32(static_cast(mode))) { + ZLOGE("SendRequest write parcel failed."); + return Status::IPC_ERROR; + } + if (!data.WriteString(query)) { + ZLOGE("write query fail"); + return Status::IPC_ERROR; + } + MessageParcel reply; + int32_t error = Remote()->SendRequest(SYNC_WITH_CONDITION, data, reply, mo); + if (error != 0) { + ZLOGE("SendRequest returned %d", error); + return Status::IPC_ERROR; + } + return static_cast(reply.ReadInt32()); +} Status SingleKvStoreProxy::RemoveDeviceData(const std::string &device) { @@ -823,6 +848,56 @@ Status SingleKvStoreProxy::GetSecurityLevel(SecurityLevel &securityLevel) return status; } +Status SingleKvStoreProxy::SubscribeWithQuery(const std::vector &deviceIds, const std::string &query) +{ + MessageParcel data; + if (!data.WriteInterfaceToken(SingleKvStoreProxy::GetDescriptor())) { + ZLOGE("write descriptor failed"); + return Status::IPC_ERROR; + } + if (!data.WriteStringVector(deviceIds)) { + ZLOGE("SendRequest write parcel failed."); + return Status::IPC_ERROR; + } + if (!data.WriteString(query)) { + ZLOGE("write query fail"); + return Status::IPC_ERROR; + } + MessageParcel reply; + MessageOption mo { MessageOption::TF_SYNC }; + int32_t error = Remote()->SendRequest(SUBSCRIBE_WITH_QUERY, data, reply, mo); + if (error != 0) { + ZLOGE("SendRequest returned %d", error); + return Status::IPC_ERROR; + } + return static_cast(reply.ReadInt32()); +} + +Status SingleKvStoreProxy::UnSubscribeWithQuery(const std::vector &deviceIds, const std::string &query) +{ + MessageParcel data; + if (!data.WriteInterfaceToken(SingleKvStoreProxy::GetDescriptor())) { + ZLOGE("write descriptor failed"); + return Status::IPC_ERROR; + } + if (!data.WriteStringVector(deviceIds)) { + ZLOGE("SendRequest write parcel failed."); + return Status::IPC_ERROR; + } + if (!data.WriteString(query)) { + ZLOGE("write query fail"); + return Status::IPC_ERROR; + } + MessageParcel reply; + MessageOption mo { MessageOption::TF_SYNC }; + int32_t error = Remote()->SendRequest(UNSUBSCRIBE_WITH_QUERY, data, reply, mo); + if (error != 0) { + ZLOGE("SendRequest returned %d", error); + return Status::IPC_ERROR; + } + return static_cast(reply.ReadInt32()); +} + int SingleKvStoreStub::PutOnRemote(MessageParcel &data, MessageParcel &reply) { const int bufferSize = data.ReadInt32(); @@ -1346,7 +1421,7 @@ int SingleKvStoreStub::ControlOnRemote(MessageParcel &data, MessageParcel &reply return 0; } -int SingleKvStoreStub::OnSecurityLevelRequest(MessageParcel& data, MessageParcel &reply) +int SingleKvStoreStub::OnSecurityLevelRequest(MessageParcel &data, MessageParcel &reply) { SecurityLevel securityLevel = SecurityLevel::NO_LABEL; auto status = GetSecurityLevel(securityLevel); @@ -1505,6 +1580,27 @@ int SingleKvStoreStub::OnCapabilityEnableRequest(MessageParcel &data, MessagePar return 0; } +int SingleKvStoreStub::OnSyncWithConditionRequest(MessageParcel &data, MessageParcel &reply) +{ + std::vector devices; + if (!data.ReadStringVector(&devices) || devices.empty()) { + ZLOGI("SYNC list:%zu", devices.size()); + if (!reply.WriteInt32(static_cast(Status::INVALID_ARGUMENT))) { + ZLOGE("write sync status fail"); + return -1; + } + return 0; + } + auto mode = static_cast(data.ReadInt32()); + auto query = data.ReadString(); + Status status = SyncWithCondition(devices, mode, query); + if (!reply.WriteInt32(static_cast(status))) { + ZLOGE("write sync status fail"); + return -1; + } + return 0; +} + int SingleKvStoreStub::OnRemoteRequest(uint32_t code, MessageParcel &data, MessageParcel &reply, MessageOption &option) { @@ -1522,4 +1618,44 @@ int SingleKvStoreStub::OnRemoteRequest(uint32_t code, MessageParcel &data, Messa return IPCObjectStub::OnRemoteRequest(code, data, reply, mo); } } + +int SingleKvStoreStub::OnSubscribeWithQueryRequest(MessageParcel &data, MessageParcel &reply) +{ + std::vector devices; + if (!data.ReadStringVector(&devices) || devices.empty()) { + ZLOGE("SYNC list:%zu", devices.size()); + if (!reply.WriteInt32(static_cast(Status::INVALID_ARGUMENT))) { + ZLOGE("write sync status fail"); + return -1; + } + return 0; + } + auto query = data.ReadString(); + Status status = SubscribeWithQuery(devices, query); + if (!reply.WriteInt32(static_cast(status))) { + ZLOGE("write sync status fail"); + return -1; + } + return 0; +} + +int SingleKvStoreStub::OnUnSubscribeWithQueryRequest(MessageParcel &data, MessageParcel &reply) +{ + std::vector devices; + if (!data.ReadStringVector(&devices) || devices.empty()) { + ZLOGE("SYNC list:%zu", devices.size()); + if (!reply.WriteInt32(static_cast(Status::INVALID_ARGUMENT))) { + ZLOGE("write sync status fail"); + return -1; + } + return 0; + } + auto query = data.ReadString(); + Status status = UnSubscribeWithQuery(devices, query); + if (!reply.WriteInt32(static_cast(status))) { + ZLOGE("write sync status fail"); + return -1; + } + return 0; +} } // namespace OHOS::DistributedKv diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp index 3ac39f05e5b0180c7d643e032d061f6194ce15b7..b881b08dc8f9c2f765b61b8594917493bdac09bf 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.cpp @@ -164,8 +164,7 @@ Status SingleKvStoreClient::GetCountWithQuery(const DataQuery &query, int &resul return GetCountWithQuery(query.ToString(), result); } -Status SingleKvStoreClient::Sync(const std::vector &deviceIdList, const SyncMode &mode, - uint32_t allowedDelayMs) +Status SingleKvStoreClient::Sync(const std::vector &deviceIdList, SyncMode mode, uint32_t allowedDelayMs) { DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__), true); @@ -448,6 +447,46 @@ Status SingleKvStoreClient::GetSecurityLevel(SecurityLevel &securityLevel) const return Status::SERVER_UNAVAILABLE; } +Status SingleKvStoreClient::SyncWithCondition(const std::vector &deviceIdList, SyncMode mode, + const DataQuery &query) +{ + if (kvStoreProxy_ == nullptr) { + ZLOGE("singleKvstore proxy is nullptr."); + return Status::SERVER_UNAVAILABLE; + } + if (deviceIdList.empty()) { + ZLOGW("deviceIdList is empty."); + return Status::INVALID_ARGUMENT; + } + return kvStoreProxy_->SyncWithCondition(deviceIdList, mode, query.ToString()); +} + +Status SingleKvStoreClient::SubscribeWithQuery(const std::vector& deviceIdList, const DataQuery& query) +{ + if (kvStoreProxy_ == nullptr) { + ZLOGE("singleKvstore proxy is nullptr."); + return Status::SERVER_UNAVAILABLE; + } + if (deviceIdList.empty()) { + ZLOGW("deviceIdList is empty."); + return Status::INVALID_ARGUMENT; + } + return kvStoreProxy_->SubscribeWithQuery(deviceIdList, query.ToString()); +} + +Status SingleKvStoreClient::UnSubscribeWithQuery(const std::vector& deviceIdList, const DataQuery& query) +{ + if (kvStoreProxy_ == nullptr) { + ZLOGE("singleKvstore proxy is nullptr."); + return Status::SERVER_UNAVAILABLE; + } + if (deviceIdList.empty()) { + ZLOGW("deviceIdList is empty."); + return Status::INVALID_ARGUMENT; + } + return kvStoreProxy_->UnSubscribeWithQuery(deviceIdList, query.ToString()); +} + Status SingleKvStoreClient::GetKvStoreSnapshot(std::shared_ptr observer, std::shared_ptr &snapshot) const { diff --git a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h index 633649f9af3ed4f787e17e0e554682bc95cd6930..646f7ac5c9c725162a5978c2900edaabc5ddab42 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h +++ b/frameworks/innerkitsimpl/distributeddatafwk/src/single_kvstore_client.h @@ -48,7 +48,7 @@ public: Status GetCountWithQuery(const DataQuery &query, int &result) const override; - Status Sync(const std::vector &deviceIdList, const SyncMode &mode, uint32_t allowedDelayMs) override; + Status Sync(const std::vector &deviceIdList, SyncMode mode, uint32_t allowedDelayMs) override; Status RemoveDeviceData(const std::string &device) override; @@ -84,6 +84,11 @@ public: const std::vector &remoteSupportLabels) const override; Status GetSecurityLevel(SecurityLevel &securityLevel) const override; + Status SyncWithCondition(const std::vector &deviceIdList, SyncMode mode, + const DataQuery &query) override; + + Status SubscribeWithQuery(const std::vector &deviceIdList, const DataQuery &query) override; + Status UnSubscribeWithQuery(const std::vector &deviceIdList, const DataQuery &query) override; Status GetKvStoreSnapshot(std::shared_ptr observer, std::shared_ptr &snapshot) const override; Status ReleaseKvStoreSnapshot(std::shared_ptr &snapshot) override; diff --git a/frameworks/innerkitsimpl/distributeddatafwk/test/unittest/single_kvstore_client_test.cpp b/frameworks/innerkitsimpl/distributeddatafwk/test/unittest/single_kvstore_client_test.cpp index dc9bd854ca984418467cd7d952eddceaf9014227..08a1a52e4e4831208f55548f7d780ab44e40e3e5 100755 --- a/frameworks/innerkitsimpl/distributeddatafwk/test/unittest/single_kvstore_client_test.cpp +++ b/frameworks/innerkitsimpl/distributeddatafwk/test/unittest/single_kvstore_client_test.cpp @@ -1053,3 +1053,55 @@ HWTEST_F(SingleKvStoreClientTest, SingleKvStoreDeviceSync002 ,TestSize.Level1) auto testStatus = schemaSingleKvStorePtr->SetCapabilityRange(local, remote); EXPECT_EQ(testStatus, Status::SUCCESS) << "set range fail"; } + +/** +* @tc.name: SyncWithCondition001 +* @tc.desc: sync device data with condition; +* @tc.type: FUNC +* @tc.require: AR000GH097 +* @tc.author: liuwenhui +*/ +HWTEST_F(SingleKvStoreClientTest, SyncWithCondition001, TestSize.Level1) +{ + EXPECT_NE(singleKvStorePtr, nullptr) << "kvStorePtr is null."; + std::vector deviceIds = {"invalid_device_id1", "invalid_device_id2"}; + DataQuery dataQuery; + dataQuery.KeyPrefix("name"); + auto syncStatus = singleKvStorePtr->SyncWithCondition(deviceIds, SyncMode::PUSH, dataQuery); + EXPECT_NE(syncStatus, Status::SUCCESS) << "sync device should not return success"; +} + +/** + * @tc.name: SubscribeWithQuery001 + * desc: subscribe and sync device data with query; + * type: FUNC + * require: AR000GH096 + * author:taoyuxin + */ +HWTEST_F(SingleKvStoreClientTest, SubscribeWithQuery001, TestSize.Level1) +{ + EXPECT_NE(singleKvStorePtr, nullptr) << "kvStorePtr is null."; + std::vector deviceIds = {"invalid_device_id1", "invalid_device_id2"}; + DataQuery dataQuery; + dataQuery.KeyPrefix("name"); + auto syncStatus = singleKvStorePtr->SubscribeWithQuery(deviceIds, dataQuery); + EXPECT_NE(syncStatus, Status::SUCCESS) << "sync device should not return success"; +} + +/** + * @tc.name: UnSubscribeWithQuery001 + * desc: subscribe and sync device data with query; + * type: FUNC + * require: SR000GH095 + * author:taoyuxin + */ +HWTEST_F(SingleKvStoreClientTest, UnSubscribeWithQuery001, TestSize.Level1) +{ + EXPECT_NE(singleKvStorePtr, nullptr) << "kvStorePtr is null."; + std::vector deviceIds = {"invalid_device_id1", "invalid_device_id2"}; + DataQuery dataQuery; + dataQuery.KeyPrefix("name"); + auto unSubscribeStatus = singleKvStorePtr->UnSubscribeWithQuery(deviceIds, dataQuery); + EXPECT_NE(unSubscribeStatus, Status::SUCCESS) << "sync device should not return success"; +} + diff --git a/interfaces/innerkits/distributeddata/include/single_kvstore.h b/interfaces/innerkits/distributeddata/include/single_kvstore.h index b69003d99a38933680868a802dae3229e7dc83aa..fb79268e9d55e8827f0a215cefe7bd7c57764ca6 100755 --- a/interfaces/innerkits/distributeddata/include/single_kvstore.h +++ b/interfaces/innerkits/distributeddata/include/single_kvstore.h @@ -118,7 +118,7 @@ public: // allowedDelayMs: allowed delay milli-second to sync. default value is 0 for compatibility. // Return: // Status of this Sync operation. - KVSTORE_API virtual Status Sync(const std::vector &deviceIdList, const SyncMode &mode, + KVSTORE_API virtual Status Sync(const std::vector &deviceIdList, SyncMode mode, uint32_t allowedDelayMs = 0) = 0; // Remove the device data synced from remote. @@ -172,6 +172,43 @@ public: const std::vector &remoteSupportLabels) const = 0; KVSTORE_API virtual Status GetSecurityLevel(SecurityLevel &securityLevel) const = 0; + + /* + * Sync store with other devices only syncing the data which is satisfied with the condition. + * This is an asynchronous method, sync will fail if there is a syncing operation in progress. + * Parameters: + * deviceIdList: device list to sync, this is network id from soft bus. + * query: the query condition. + * mode: mode can be set to SyncMode::PUSH, SyncMode::PULL and SyncMode::PUSH_PULL. PUSH_PULL will firstly + * push all not-local store to listed devices, then pull these stores back. + * Return: + * Status of this Sync operation. + */ + KVSTORE_API virtual Status SyncWithCondition(const std::vector &deviceIdList, SyncMode mode, + const DataQuery &query) = 0; + + /* + * Subscribe store with other devices consistently Synchronize the data which is satisfied with the condition. + * Parameters: + * deviceIdList: device list to sync, this is network id from soft bus. + * query: the query condition. + * Return: + * Status of this Subscribe operation. + */ + KVSTORE_API virtual Status SubscribeWithQuery(const std::vector &deviceIdList, + const DataQuery &query) = 0; + + /* + * UnSubscribe store with other devices which is satisfied with the condition. + * Parameters: + * deviceIdList: device list to sync, this is network id from soft bus. + * query: the query condition. + * Return: + * Status of this UnSubscribe operation. + */ + KVSTORE_API virtual Status UnSubscribeWithQuery(const std::vector &deviceIdList, + const DataQuery &query) = 0; + protected: // control this store. // Parameters: diff --git a/interfaces/innerkits/distributeddata/include/types.h b/interfaces/innerkits/distributeddata/include/types.h index 471936e796f89cf09c7e0b526b675503d6c77e3f..d94c3b0d0ced7137505a0fedbac91c3ccdb30f24 100755 --- a/interfaces/innerkits/distributeddata/include/types.h +++ b/interfaces/innerkits/distributeddata/include/types.h @@ -126,6 +126,7 @@ enum class Status { MIGRATION_KVSTORE_FAILED = DISTRIBUTEDDATAMGR_ERR_OFFSET + 30, EXCEED_MAX_ACCESS_RATE = DISTRIBUTEDDATAMGR_ERR_OFFSET + 31, SECURITY_LEVEL_ERROR = DISTRIBUTEDDATAMGR_ERR_OFFSET + 32, + OVER_MAX_SUBSCRIBE_LIMITS = DISTRIBUTEDDATAMGR_ERR_OFFSET + 33, }; enum class SubscribeType { diff --git a/services/distributeddataservice/app/src/kvstore_data_service.cpp b/services/distributeddataservice/app/src/kvstore_data_service.cpp index 05b09a0be6475e00b3569cf3cbdad09208cee039..78e01b8d200a9b6d64eab74805c2f7fc86cef710 100644 --- a/services/distributeddataservice/app/src/kvstore_data_service.cpp +++ b/services/distributeddataservice/app/src/kvstore_data_service.cpp @@ -1072,6 +1072,7 @@ void KvStoreDataService::ResolveAutoLaunchParamByIdentifier(const std::string &i option.createDirByStoreIdOnly = true; option.dataDir = entry.second.kvStoreMetaData.dataDir; option.secOption = KvStoreAppManager::ConvertSecurity(entry.second.kvStoreMetaData.securityLevel); + option.isAutoSync = entry.second.kvStoreMetaData.isAutoSync; param.userId = userId; param.appId = entry.second.kvStoreMetaData.appId; param.storeId = entry.second.kvStoreMetaData.storeId; @@ -1084,6 +1085,8 @@ void KvStoreDataService::ResolveAutoLaunchParamByIdentifier(const std::string &i bool KvStoreDataService::CheckPermissions(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, uint8_t flag) const { + ZLOGI("userId=%.6s appId=%s storeId=%s flag=%d deviceId=%.4s", userId.c_str(), appId.c_str(), storeId.c_str(), flag, + deviceId.c_str()); // only print 4 chars of device id auto &instance = KvStoreMetaManager::GetInstance(); KvStoreMetaData metaData; auto localDevId = DeviceKvStoreImpl::GetLocalDeviceId(); diff --git a/services/distributeddataservice/app/src/single_kvstore_impl.cpp b/services/distributeddataservice/app/src/single_kvstore_impl.cpp index 3d814333063c2910de13b819a71f875aac800209..d8a959c3b65cfe0f002dc6e89b803ef872b2e895 100755 --- a/services/distributeddataservice/app/src/single_kvstore_impl.cpp +++ b/services/distributeddataservice/app/src/single_kvstore_impl.cpp @@ -148,6 +148,10 @@ Status SingleKvStoreImpl::ConvertDbStatus(DistributedDB::DBStatus status) return Status::INVALID_QUERY_FIELD; case DistributedDB::DBStatus::NOT_SUPPORT: return Status::NOT_SUPPORT; + case DistributedDB::DBStatus::TIME_OUT: + return Status::TIME_OUT; + case DistributedDB::DBStatus::OVER_MAX_SUBSCRIBE_LIMITS: + return Status::OVER_MAX_LIMITS; case DistributedDB::DBStatus::EKEYREVOKED_ERROR: // fallthrough case DistributedDB::DBStatus::SECURITY_OPTION_CHECK_ERROR: return Status::SECURITY_LEVEL_ERROR; @@ -303,6 +307,20 @@ int SingleKvStoreImpl::ConvertToDbObserverMode(const SubscribeType subscribeType return dbObserverMode; } + // Convert KvStore sync mode to DistributeDB sync mode. + DistributedDB::SyncMode SingleKvStoreImpl::ConvertToDbSyncMode(SyncMode syncMode) const + { + DistributedDB::SyncMode dbSyncMode; + if (syncMode == SyncMode::PUSH) { + dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_ONLY; + } else if (syncMode == SyncMode::PULL) { + dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PULL_ONLY; + } else { + dbSyncMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL; + } + return dbSyncMode; + } + Status SingleKvStoreImpl::UnSubscribeKvStore(const SubscribeType subscribeType, sptr observer) { DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__)); @@ -758,8 +776,7 @@ Status SingleKvStoreImpl::RemoveDeviceData(const std::string &device) return Status::ERROR; } -Status SingleKvStoreImpl::Sync(const std::vector &deviceIdList, const SyncMode &mode, - uint32_t allowedDelayMs) +Status SingleKvStoreImpl::Sync(const std::vector &deviceIds, SyncMode mode, uint32_t allowedDelayMs) { DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__)); ZLOGD("start."); @@ -771,26 +788,48 @@ Status SingleKvStoreImpl::Sync(const std::vector &deviceIdList, con { std::unique_lock lock(storeNbDelegateMutex_); if ((waitingSyncCount_ > 0) && - (lastSyncDeviceIdList_ == deviceIdList) && (lastSyncMode_ == mode) && (lastSyncDelayMs_ == delayMs)) { + (lastSyncDeviceIds_ == deviceIds) && (lastSyncMode_ == mode) && (lastSyncDelayMs_ == delayMs)) { return Status::SUCCESS; } - lastSyncDeviceIdList_ = deviceIdList; + lastSyncDeviceIds_ = deviceIds; lastSyncMode_ = mode; lastSyncDelayMs_ = delayMs; } - return AddSync(deviceIdList, mode, delayMs); + return AddSync(deviceIds, mode, delayMs); } -Status SingleKvStoreImpl::AddSync(const std::vector &deviceIdList, const SyncMode &mode, +Status SingleKvStoreImpl::Sync(const std::vector &deviceIds, SyncMode mode, const std::string &query) +{ + DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__)); + ZLOGD("start."); + if (!flowCtrlManager_.IsTokenEnough()) { + ZLOGE("flow control denied"); + return Status::EXCEED_MAX_ACCESS_RATE; + } + uint32_t delayMs = GetSyncDelayTime(0); + return AddSync(deviceIds, mode, query, delayMs); +} + +Status SingleKvStoreImpl::AddSync(const std::vector &deviceIds, SyncMode mode, uint32_t delayMs) { ZLOGD("start."); waitingSyncCount_++; return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, - std::bind(&SingleKvStoreImpl::DoSync, this, deviceIdList, mode, std::placeholders::_1), + std::bind(&SingleKvStoreImpl::DoSync, this, deviceIds, mode, std::placeholders::_1), std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); } +Status SingleKvStoreImpl::AddSync(const std::vector &deviceIds, SyncMode mode, + const std::string &query, uint32_t delayMs) +{ + ZLOGD("start."); + waitingSyncCount_++; + return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, + std::bind(&SingleKvStoreImpl::DoSync, this, deviceIds, mode, query, std::placeholders::_1), + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); +} + uint32_t SingleKvStoreImpl::GetSyncDelayTime(uint32_t allowedDelayMs) const { uint32_t delayMs = allowedDelayMs; @@ -821,37 +860,20 @@ void SingleKvStoreImpl::DoSyncComplete(const std::map resultMap; for (auto device : devicesSyncResult) { - if (device.second == DistributedDB::DBStatus::OK) { - resultMap[device.first] = Status::SUCCESS; - } else if (device.second == DistributedDB::DBStatus::NOT_FOUND) { - resultMap[device.first] = Status::DEVICE_NOT_FOUND; - } else if (device.second == DistributedDB::DBStatus::TIME_OUT) { - resultMap[device.first] = Status::TIME_OUT; - } else { - resultMap[device.first] = Status::ERROR; - } + resultMap[device.first] = ConvertDbStatus(device.second); } syncRetries_ = 0; ZLOGD("callback."); if (syncCallback_ != nullptr) { syncCallback_->SyncCompleted(resultMap); } -} -Status SingleKvStoreImpl::DoSync(const std::vector &deviceIdList, const SyncMode &mode, - const KvStoreSyncManager::SyncEnd &syncEnd) +Status SingleKvStoreImpl::DoSync(const std::vector &deviceIds, SyncMode mode, + const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd) { ZLOGD("start."); - std::vector deviceUuidList; - for (auto const &device : deviceIdList) { - std::string nodeid = KvStoreUtils::GetProviderInstance().GetUuidByNodeId(device); - if (!nodeid.empty()) { - deviceUuidList.push_back(nodeid); - } else { - ZLOGW("invalid deviceId:%s.", KvStoreUtils::ToBeAnonymous(device).c_str()); - } - } - if (deviceUuidList.empty()) { + std::vector deviceUuids = MapNodeIdToUuids(deviceIds); + if (deviceUuids.empty()) { ZLOGE("not found deviceIds."); return Status::ERROR; } @@ -863,7 +885,13 @@ Status SingleKvStoreImpl::DoSync(const std::vector &deviceIdList, c } else { dbMode = DistributedDB::SyncMode::SYNC_MODE_PUSH_PULL; } - + bool isSuccess = false; + DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess); + if (!isSuccess) { + ZLOGE("StringToDbQuery failed."); + return Status::INVALID_ARGUMENT; + } + ZLOGD("StringToDbQuery success."); DistributedDB::DBStatus status; { std::shared_lock lock(storeNbDelegateMutex_); @@ -873,32 +901,173 @@ Status SingleKvStoreImpl::DoSync(const std::vector &deviceIdList, c } waitingSyncCount_--; DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__)); - status = kvStoreNbDelegate_->Sync(deviceUuidList, dbMode, syncEnd); + status = kvStoreNbDelegate_->Sync(deviceUuids, dbMode, syncEnd, dbQuery, false); ZLOGD("end: %d", static_cast(status)); } Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__}); - if (status == DistributedDB::DBStatus::OK) { - return Status::SUCCESS; + if (status == DistributedDB::DBStatus::BUSY) { + if (syncRetries_ < KvStoreSyncManager::SYNC_RETRY_MAX_COUNT) { + syncRetries_++; + auto addStatus = AddSync(deviceIds, mode, query, + KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS); + if (addStatus == Status::SUCCESS) { + return addStatus; + } + } } + return ConvertDbStatus(status); +} + +Status SingleKvStoreImpl::DoSync(const std::vector &deviceIds, SyncMode mode, + const KvStoreSyncManager::SyncEnd &syncEnd) +{ + ZLOGD("start."); + std::vector deviceUuids = MapNodeIdToUuid(deviceIds); + if (deviceUuids.empty()) { + ZLOGE("not found deviceIds."); + return Status::ERROR; + } + DistributedDB::SyncMode dbMode = ConvertToDbSyncMode(mode); + DistributedDB::DBStatus status; + { + std::shared_lock lock(storeNbDelegateMutex_); + if (kvStoreNbDelegate_ == nullptr) { + ZLOGE("kvstore is not open"); + return Status::ILLEGAL_STATE; + } + waitingSyncCount_--; + DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__)); + status = kvStoreNbDelegate_->Sync(deviceUuids, dbMode, syncEnd); + ZLOGD("end: %d", static_cast(status)); + } + Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__}); if (status == DistributedDB::DBStatus::BUSY) { if (syncRetries_ < KvStoreSyncManager::SYNC_RETRY_MAX_COUNT) { syncRetries_++; - auto addStatus = AddSync(deviceUuidList, mode, KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS); + auto addStatus = AddSync(deviceUuids, mode, KvStoreSyncManager::SYNC_DEFAULT_DELAY_MS); if (addStatus == Status::SUCCESS) { return addStatus; } } } - if (status == DistributedDB::DBStatus::DB_ERROR) { - return Status::DB_ERROR; + return ConvertDbStatus(status); +} + +std::vector SingleKvStoreImpl::MapNodeIdToUuids(const std::vector &deviceIds) +{ + std::vector deviceUuids; + for (auto const &nodeId : deviceIds) { + std::string uuid = KvStoreUtils::GetProviderInstance().GetUuidByNodeId(nodeId); + if (!uuid.empty()) { + deviceUuids.push_back(uuid); + } } - if (status == DistributedDB::DBStatus::NOT_FOUND) { - return Status::DEVICE_NOT_FOUND; + return deviceUuids; +} + +Status SingleKvStoreImpl::DoSubscribeWithQuery(const std::vector &deviceIds, + const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd) +{ + ZLOGD("start."); + std::vector deviceUuids = MapNodeIdToUuids(deviceIds); + if (deviceUuids.empty()) { + ZLOGE("not found deviceIds."); + return Status::ERROR; } - if (status == DistributedDB::DBStatus::INVALID_ARGS) { + bool isSuccess = false; + DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess); + if (!isSuccess) { + ZLOGE("StringToDbQuery failed."); return Status::INVALID_ARGUMENT; } - return Status::ERROR; + ZLOGD("StringToDbQuery success."); + DistributedDB::DBStatus status; + { + std::shared_lock lock(storeNbDelegateMutex_); + if (kvStoreNbDelegate_ == nullptr) { + ZLOGE("kvstore is not open"); + return Status::ILLEGAL_STATE; + } + DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__)); + status = kvStoreNbDelegate_->SubscribeRemoteQuery(deviceUuids, syncEnd, dbQuery, false); + ZLOGD("end: %d", static_cast(status)); + } + Reporter::GetInstance()->VisitStatistic()->Report({bundleName_, __FUNCTION__}); + return ConvertDbStatus(status); +} + +Status SingleKvStoreImpl::DoUnSubscribeWithQuery(const std::vector &deviceIds, const std::string &query, + const KvStoreSyncManager::SyncEnd &syncEnd) +{ + ZLOGD("start."); + std::vector deviceUuids = MapNodeIdToUuids(deviceIds); + if (deviceUuids.empty()) { + ZLOGE("not found deviceIds."); + return Status::ERROR; + } + bool isSuccess = false; + DistributedDB::Query dbQuery = QueryHelper::StringToDbQuery(query, isSuccess); + if (!isSuccess) { + ZLOGE("StringToDbQuery failed."); + return Status::INVALID_ARGUMENT; + } + ZLOGD("StringToDbQuery success."); + DistributedDB::DBStatus status; + { + std::shared_lock lock(storeNbDelegateMutex_); + if (kvStoreNbDelegate_ == nullptr) { + ZLOGE("kvstore is not open"); + return Status::ILLEGAL_STATE; + } + DdsTrace trace(std::string(LOG_TAG "Delegate::") + std::string(__FUNCTION__)); + status = kvStoreNbDelegate_->UnSubscribeRemoteQuery(deviceUuids, syncEnd, dbQuery, false); + ZLOGD("end: %d", static_cast(status)); + } + return ConvertDbStatus(status); +} + +Status SingleKvStoreImpl::AddSubscribeWithQuery(const std::vector &deviceIds, + const std::string &query, uint32_t delayMs) +{ + ZLOGD("start."); + return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, + std::bind(&SingleKvStoreImpl::DoSubscribeWithQuery, this, deviceIds, query, std::placeholders::_1), + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); +} + +Status SingleKvStoreImpl::AddUnSubscribeWithQuery(const std::vector &deviceIds, + const std::string &query, uint32_t delayMs) +{ + ZLOGD("start."); + return KvStoreSyncManager::GetInstance()->AddSyncOperation(reinterpret_cast(this), delayMs, + std::bind(&SingleKvStoreImpl::DoUnSubscribeWithQuery, this, deviceIds, query, std::placeholders::_1), + std::bind(&SingleKvStoreImpl::DoSyncComplete, this, std::placeholders::_1)); +} + +Status SingleKvStoreImpl::SubscribeWithQuery(const std::vector &deviceIds, + const std::string &query) +{ + DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__)); + ZLOGD("start."); + if (!flowCtrlManager_.IsTokenEnough()) { + ZLOGE("flow control denied"); + return Status::EXCEED_MAX_ACCESS_RATE; + } + uint32_t delayMs = GetSyncDelayTime(0); + return AddSubscribeWithQuery(deviceIds, query, delayMs); +} + +Status SingleKvStoreImpl::UnSubscribeWithQuery(const std::vector &deviceIds, + const std::string &query) +{ + DdsTrace trace(std::string(LOG_TAG "::") + std::string(__FUNCTION__)); + ZLOGD("start."); + if (!flowCtrlManager_.IsTokenEnough()) { + ZLOGE("flow control denied"); + return Status::EXCEED_MAX_ACCESS_RATE; + } + uint32_t delayMs = GetSyncDelayTime(0); + return AddUnSubscribeWithQuery(deviceIds, query, delayMs); } InnerStatus SingleKvStoreImpl::Close(DistributedDB::KvStoreDelegateManager *kvStoreDelegateManager) @@ -1482,6 +1651,7 @@ Status SingleKvStoreImpl::GetSecurityLevel(SecurityLevel &securityLevel) } return Status::SUCCESS; } + void SingleKvStoreImpl::OnDump(int fd) const { const std::string prefix(12, ' '); diff --git a/services/distributeddataservice/app/src/single_kvstore_impl.h b/services/distributeddataservice/app/src/single_kvstore_impl.h index db786739864b7f9a35cbffec0ed3ed95b7c3ab5f..d53c0270f2853cedf890b55970e5013d62406afe 100755 --- a/services/distributeddataservice/app/src/single_kvstore_impl.h +++ b/services/distributeddataservice/app/src/single_kvstore_impl.h @@ -50,7 +50,8 @@ public: std::function)> callback) override; Status GetCountWithQuery(const std::string &query, int &result) override; Status CloseResultSet(sptr resultSet) override; - Status Sync(const std::vector &deviceIdList, const SyncMode &mode, uint32_t allowedDelayMs) override; + Status Sync(const std::vector &deviceIds, SyncMode mode, uint32_t allowedDelayMs) override; + Status Sync(const std::vector &deviceIds, SyncMode mode, const std::string &query) override; Status RemoveDeviceData(const std::string &device) override; Status RegisterSyncCallback(sptr callback) override; Status UnRegisterSyncCallback() override; @@ -82,16 +83,34 @@ public: private: Status ConvertDbStatus(DistributedDB::DBStatus dbStatus); uint32_t GetSyncDelayTime(uint32_t allowedDelayMs) const; - Status AddSync(const std::vector &deviceIdList, const SyncMode &mode, uint32_t delayMs); + Status AddSync(const std::vector &deviceIds, SyncMode mode, uint32_t delayMs); + Status AddSync(const std::vector &deviceIds, SyncMode mode, + const std::string &query, uint32_t delayMs); Status RemoveAllSyncOperation(); void DoSyncComplete(const std::map &devicesSyncResult); - Status DoSync(const std::vector &deviceIdList, const SyncMode &mode, + Status DoSync(const std::vector &deviceIds, const SyncMode mode, + const KvStoreSyncManager::SyncEnd &syncEnd); + Status DoSync(const std::vector &deviceIds, const SyncMode mode, const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd); Status AddAutoSync(); Status DoAutoSync(const KvStoreSyncManager::SyncEnd &); Status RebuildKvStoreObserver(DistributedDB::KvStoreNbDelegate *kvStoreNbDelegate); Status RebuildKvStoreResultSet(); int ConvertToDbObserverMode(SubscribeType subscribeType) const; + DistributedDB::SyncMode ConvertToDbSyncMode(SyncMode syncMode) const; + Status DoSubscribeWithQuery(const std::vector &deviceIds, + const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd); + Status AddSubscribeWithQuery(const std::vector &deviceIds, + const std::string &query, uint32_t delayMs); + Status SubscribeWithQuery(const std::vector &deviceIds, + const std::string &query) override; + Status DoUnSubscribeWithQuery(const std::vector &deviceIds, + const std::string &query, const KvStoreSyncManager::SyncEnd &syncEnd); + Status AddUnSubscribeWithQuery(const std::vector &deviceIds, + const std::string &query, uint32_t delayMs); + Status UnSubscribeWithQuery(const std::vector &deviceIds, + const std::string &query) override; + std::vector MapNodeIdToUuids(const std::vector &deviceIds); // kvstore options. const Options options_; @@ -110,7 +129,7 @@ private: std::atomic_uint32_t waitingSyncCount_{ 0 }; std::atomic_uint32_t waitingAutoSyncCount_{ 0 }; std::atomic_uint32_t syncRetries_{ 0 }; - std::vector lastSyncDeviceIdList_{}; + std::vector lastSyncDeviceIds_{}; SyncMode lastSyncMode_{ SyncMode::PULL }; uint32_t lastSyncDelayMs_{ 0 };