#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace CesiumUtility { template class SharedAsset; } namespace CesiumAsync { /** * @brief The default context passed to \ref SharedAssetDepot factory functions. */ struct SharedAssetContext { /** * @brief The async system. */ AsyncSystem asyncSystem; /** * @brief The asset accessor. */ std::shared_ptr pAssetAccessor; }; /** * @brief A depot for {@link CesiumUtility::SharedAsset} instances, which are potentially shared between multiple objects. * * @tparam TAssetType The type of asset stored in this depot. This should * be derived from {@link CesiumUtility::SharedAsset}. * @tparam TAssetKey The key type used to uniquely identify assets in this * depot. * @tparam TContext The type of context passed to the factory function when * creating a new asset. This defaults to \ref SharedAssetContext. This type * must contain a field named `asyncSystem` of type \ref AsyncSystem. */ template < typename TAssetType, typename TAssetKey, typename TContext = SharedAssetContext> class CESIUMASYNC_API SharedAssetDepot : public CesiumUtility::ReferenceCountedThreadSafe< SharedAssetDepot>, public CesiumUtility::IDepotOwningAsset { public: /** * @brief The maximum total byte usage of assets that have been loaded but are * no longer needed. * * When cached assets are no longer needed, they're marked as * candidates for deletion. However, this deletion doesn't actually occur * until the total byte usage of deletion candidates exceeds this threshold. * At that point, assets are cleaned up in the order that they were marked for * deletion until the total dips below this threshold again. * * Default is 16MiB. */ std::atomic inactiveAssetSizeLimitBytes = static_cast(16 * 1024 * 1024); /** * @brief Signature for the callback function that will be called to fetch and * create a new instance of `TAssetType` if one with the given key doesn't * already exist in the depot. * * @param asyncSystem The \ref AsyncSystem used by this \ref SharedAssetDepot. * @param pAssetAccessor The \ref IAssetAccessor used by this \ref * SharedAssetDepot. Use this to fetch the asset. * @param key The `TAssetKey` for the asset that should be loaded by this * factory. * @returns A \ref CesiumAsync::Future "Future" that resolves to a \ref * CesiumUtility::ResultPointer "ResultPointer" containing the loaded asset, * or any error information if the asset failed to load. */ using FactorySignature = CesiumAsync::Future>( const TContext& context, const TAssetKey& key); /** * @brief Creates a new `SharedAssetDepot` using the given factory callback to * load new assets. * * @param factory The factory to use to fetch and create assets that don't * already exist in the depot. See \ref FactorySignature. */ SharedAssetDepot(std::function factory); virtual ~SharedAssetDepot(); /** * @brief Gets an asset from the depot if it already exists, or creates it * using the depot's factory if it does not. * * @param context The context to pass to the factory function. * @param assetKey The key uniquely identifying the asset to get or create. * @return A shared future that resolves when the asset is ready or fails. */ SharedFuture> getOrCreate(const TContext& context, const TAssetKey& assetKey); /** * @brief Invalidates the previously-cached asset with the given key, so that * the next call to {@link getOrCreate} will create the asset instead of * returning the existing one. * * Anyone already using the existing asset may continue to do so. * * If an asset with the given key does not exist in the depot, this method * does nothing. * * @param assetKey The asset key to invalidate. * @returns True if the asset was invalidated; false if the asset key does not * exist in the depot or was already invalidated. */ bool invalidate(const TAssetKey& assetKey); /** * @brief Invalidates the previously-cached asset, so that the next call to * {@link getOrCreate} will create the asset instead of returning the existing * one. * * Anyone already using the existing asset may continue to do so. * * If the asset is not associated with the depot, or if it has already been * invalidated, this method does nothing. If another asset with the same key * already exists in the depot, invalidating this one will not affect it. * * @param asset The asset to invalidate. * @returns True if the asset was invalidated; false if the asset is not owned * by the depot or was already invalidated. */ bool invalidate(TAssetType& asset); /** * @brief Returns the total number of distinct assets contained in this depot, * including both active and inactive assets. */ size_t getAssetCount() const; /** * @brief Gets the number of assets owned by this depot that are active, * meaning that they are currently being used in one or more places. */ size_t getActiveAssetCount() const; /** * @brief Gets the number of assets owned by this depot that are inactive, * meaning that they are not currently being used. */ size_t getInactiveAssetCount() const; /** * @brief Gets the total bytes used by inactive (unused) assets owned by this * depot. */ int64_t getInactiveAssetTotalSizeBytes() const; // Disable copy void operator=( const SharedAssetDepot& other) = delete; private: struct LockHolder; /** * @brief Locks the shared asset depot for thread-safe access. It will remain * locked until the returned object is destroyed or the `unlock` method is * called on it. */ LockHolder lock() const; /** * @brief Marks the given asset as a candidate for deletion. * Should only be called by {@link SharedAsset}. May be called from any thread. * * @param asset The asset to mark for deletion. * @param threadOwnsDepotLock True if the calling thread already owns the * depot lock; otherwise, false. */ void markDeletionCandidate(const TAssetType& asset, bool threadOwnsDepotLock) override; void markDeletionCandidateUnderLock(const TAssetType& asset); /** * @brief Unmarks the given asset as a candidate for deletion. * Should only be called by {@link SharedAsset}. May be called from any thread. * * @param asset The asset to unmark for deletion. * @param threadOwnsDepotLock True if the calling thread already owns the * depot lock; otherwise, false. */ void unmarkDeletionCandidate( const TAssetType& asset, bool threadOwnsDepotLock) override; void unmarkDeletionCandidateUnderLock(const TAssetType& asset); /** * @brief Invalidates the asset with the given key. * * The depot lock must be held when this is called, and it will be released by * the time this method returns. */ bool invalidateUnderLock(LockHolder&& lock, const TAssetKey& assetKey); /** * @brief An entry for an asset owned by this depot. This is reference counted * so that we can keep it alive during async operations. */ struct AssetEntry : public CesiumUtility::ReferenceCountedThreadSafe { AssetEntry(TAssetKey&& key_) : CesiumUtility::ReferenceCountedThreadSafe(), key(std::move(key_)), pAsset(), maybePendingAsset(), errorsAndWarnings(), sizeInDeletionList(0), deletionListPointers() {} AssetEntry(const TAssetKey& key_) : AssetEntry(TAssetKey(key_)) {} /** * @brief The unique key identifying this asset. */ TAssetKey key; /** * @brief A pointer to the asset. This may be nullptr if the asset is still * being loaded, or if it failed to load. */ std::unique_ptr pAsset; /** * @brief If this asset is currently loading, this field holds a shared * future that will resolve when the asset load is complete. This field will * be empty if the asset finished loading, including if it failed to load. */ std::optional>> maybePendingAsset; /** * @brief The errors and warnings that occurred while loading this asset. * This will not contain any errors or warnings if the asset has not * finished loading yet. */ CesiumUtility::ErrorList errorsAndWarnings; /** * @brief The size of this asset when it was added to the * _deletionCandidates list. This is stored so that the exact same size can * be subtracted later. The value of this field is undefined if the asset is * not currently in the _deletionCandidates list. */ int64_t sizeInDeletionList; /** * @brief The next and previous pointers to entries in the * _deletionCandidates list. */ CesiumUtility::DoublyLinkedListPointers deletionListPointers; CesiumUtility::ResultPointer toResultUnderLock() const; }; // Manages the depot's mutex. Also ensures, via IntrusivePointer, that the // depot won't be destroyed while the lock is held. struct LockHolder { LockHolder( const CesiumUtility::IntrusivePointer& pDepot); ~LockHolder(); void unlock(); private: // These two fields _must_ be declared in this order to guarantee that the // mutex is released before the depot pointer. Releasing the depot pointer // could destroy the depot, and that will be disastrous if the lock is still // held. CesiumUtility::IntrusivePointer pDepot; std::unique_lock lock; }; // Maps asset keys to AssetEntry instances. This collection owns the asset // entries. std::unordered_map> _assets; // Maps asset pointers to AssetEntry instances. The values in this map refer // to instances owned by the _assets map. std::unordered_map _assetsByPointer; // List of assets that are being considered for deletion, in the order that // they became unused. CesiumUtility::DoublyLinkedList _deletionCandidates; // The total amount of memory used by all assets in the _deletionCandidates // list. int64_t _totalDeletionCandidateMemoryUsage; // The number of assets that have been invalidated but that have not been // deleted yet. Such assets hold a pointer to the depot, so the depot must be // kept alive for their entire lifetime. int64_t _liveInvalidatedAssets; // Mutex serializing access to _assets, _assetsByPointer, _deletionCandidates, // and any AssetEntry owned by this depot. mutable std::mutex _mutex; // The factory used to create new AssetType instances. std::function _factory; // This instance keeps a reference to itself whenever it is managing active // assets, preventing it from being destroyed even if all other references to // it are dropped. CesiumUtility::IntrusivePointer< SharedAssetDepot> _pKeepAlive; }; template SharedAssetDepot::SharedAssetDepot( std::function factory) : _assets(), _assetsByPointer(), _deletionCandidates(), _totalDeletionCandidateMemoryUsage(0), _liveInvalidatedAssets(0), _mutex(), _factory(std::move(factory)), _pKeepAlive(nullptr) {} template SharedAssetDepot::~SharedAssetDepot() { // Ideally, when the depot is destroyed, all the assets it owns would become // independent assets. But this is extremely difficult to manage in a // thread-safe manner. // Since we're in the destructor, we can be sure no one has a reference to // this instance anymore. That means that no other thread can be executing // `getOrCreate`, and no async asset creations are in progress. // However, if assets owned by this depot are still alive, then other // threads can still be calling addReference / releaseReference on some of // our assets even while we're running the depot's destructor. Which means // that we can end up in `markDeletionCandidate` at the same time the // destructor is running. And in fact it's possible for a `SharedAsset` with // especially poor timing to call into a `SharedAssetDepot` just after it is // destroyed. // To avoid this, we use the _pKeepAlive field to maintain an artificial // reference to this depot whenever it owns live assets. This should keep // this destructor from being called except when all of its assets are also // in the _deletionCandidates list. CESIUM_ASSERT(this->_liveInvalidatedAssets == 0); CESIUM_ASSERT(this->_assets.size() == this->_deletionCandidates.size()); } template SharedFuture> SharedAssetDepot::getOrCreate( const TContext& context, const TAssetKey& assetKey) { // We need to take care here to avoid two assets starting to load before the // first asset has added an entry and set its maybePendingAsset field. LockHolder lock = this->lock(); auto existingIt = this->_assets.find(assetKey); if (existingIt != this->_assets.end()) { // We've already loaded (or are loading) an asset with this ID - we can // just use that. const AssetEntry& entry = *existingIt->second; if (entry.maybePendingAsset) { // Asset is currently loading. return *entry.maybePendingAsset; } else { return context.asyncSystem.createResolvedFuture(entry.toResultUnderLock()) .share(); } } // Calling the factory function while holding the mutex unnecessarily // limits parallelism. It can even lead to a bug in the scenario where the // `thenInWorkerThread` continuation is invoked immediately in the current // thread, before `thenInWorkerThread` itself returns. That would result // in an attempt to lock the mutex recursively, which is not allowed. // So we jump through some hoops here to publish "this thread is working // on it", then unlock the mutex, and _then_ actually call the factory // function. Promise promise = context.asyncSystem.template createPromise(); // We haven't loaded or started to load this asset yet. // Let's do that now. CesiumUtility::IntrusivePointer< SharedAssetDepot> pDepot = this; CesiumUtility::IntrusivePointer pEntry = new AssetEntry(assetKey); auto future = promise.getFuture() .thenImmediately([pDepot, pEntry, context]() { return pDepot->_factory(context, pEntry->key); }) .thenInWorkerThread( [pDepot, pEntry](CesiumUtility::Result< CesiumUtility::IntrusivePointer>&& result) { LockHolder lock = pDepot->lock(); if (result.pValue) { result.pValue->_pDepot = pDepot.get(); pDepot->_assetsByPointer[result.pValue.get()] = pEntry.get(); } // Now that this asset is owned by the depot, we exclusively // control its lifetime with a std::unique_ptr. pEntry->pAsset = std::unique_ptr(result.pValue.get()); pEntry->errorsAndWarnings = std::move(result.errors); pEntry->maybePendingAsset.reset(); // The asset is initially live because we have an // IntrusivePointer to it right here. So make sure the depot // stays alive, too. pDepot->_pKeepAlive = pDepot; return pEntry->toResultUnderLock(); }) .catchImmediately([pDepot, pEntry](std::exception&& e) { // This asset has failed _with an exception_. We don't want to cache // this type of error. { LockHolder lock = pDepot->lock(); pDepot->_assets.erase(pEntry->key); } return CesiumUtility::Result< CesiumUtility::IntrusivePointer>( CesiumUtility::ErrorList::error( std::string("Exception while creating asset: ") + e.what())); }); SharedFuture> sharedFuture = std::move(future).share(); pEntry->maybePendingAsset = sharedFuture; [[maybe_unused]] bool added = this->_assets.emplace(assetKey, pEntry).second; // Should always be added successfully, because we checked above that the // asset key doesn't exist in the map yet. CESIUM_ASSERT(added); // Unlock the mutex and then call the factory function. lock.unlock(); promise.resolve(); return sharedFuture; } template bool SharedAssetDepot::invalidate( const TAssetKey& assetKey) { LockHolder lock = this->lock(); return this->invalidateUnderLock(std::move(lock), assetKey); } template bool SharedAssetDepot::invalidate( TAssetType& asset) { LockHolder lock = this->lock(); auto it = this->_assetsByPointer.find(&asset); if (it == this->_assetsByPointer.end()) return false; AssetEntry* pEntry = it->second; CESIUM_ASSERT(pEntry); return this->invalidateUnderLock(std::move(lock), pEntry->key); } template size_t SharedAssetDepot::getAssetCount() const { LockHolder lock = this->lock(); return this->_assets.size(); } template size_t SharedAssetDepot::getActiveAssetCount() const { LockHolder lock = this->lock(); return this->_assets.size() - this->_deletionCandidates.size(); } template size_t SharedAssetDepot::getInactiveAssetCount() const { LockHolder lock = this->lock(); return this->_deletionCandidates.size(); } template int64_t SharedAssetDepot:: getInactiveAssetTotalSizeBytes() const { LockHolder lock = this->lock(); return this->_totalDeletionCandidateMemoryUsage; } template typename SharedAssetDepot::LockHolder SharedAssetDepot::lock() const { return LockHolder{this}; } template void SharedAssetDepot::markDeletionCandidate( const TAssetType& asset, bool threadOwnsDepotLock) { if (threadOwnsDepotLock) { this->markDeletionCandidateUnderLock(asset); } else { LockHolder lock = this->lock(); this->markDeletionCandidateUnderLock(asset); } } template void SharedAssetDepot:: markDeletionCandidateUnderLock(const TAssetType& asset) { if (asset._isInvalidated) { // This asset is no longer tracked by the depot, so delete it. --this->_liveInvalidatedAssets; delete &asset; // If this depot is not managing any live assets, then we no longer need to // keep it alive. if (this->_assets.size() == this->_deletionCandidates.size() && this->_liveInvalidatedAssets == 0) { this->_pKeepAlive.reset(); } return; } // Verify that the reference count is still zero. // See: https://github.com/CesiumGS/cesium-native/issues/1073 if (asset._referenceCount != 0) { return; } auto it = this->_assetsByPointer.find(const_cast(&asset)); CESIUM_ASSERT(it != this->_assetsByPointer.end()); if (it == this->_assetsByPointer.end()) { return; } CESIUM_ASSERT(it->second != nullptr); AssetEntry& entry = *it->second; entry.sizeInDeletionList = asset.getSizeBytes(); this->_totalDeletionCandidateMemoryUsage += entry.sizeInDeletionList; this->_deletionCandidates.insertAtTail(entry); if (this->_totalDeletionCandidateMemoryUsage > this->inactiveAssetSizeLimitBytes) { // Delete the deletion candidates until we're below the limit. while (this->_deletionCandidates.size() > 0 && this->_totalDeletionCandidateMemoryUsage > this->inactiveAssetSizeLimitBytes) { AssetEntry* pOldEntry = this->_deletionCandidates.head(); this->_deletionCandidates.remove(*pOldEntry); this->_totalDeletionCandidateMemoryUsage -= pOldEntry->sizeInDeletionList; CESIUM_ASSERT( pOldEntry->pAsset == nullptr || pOldEntry->pAsset->_referenceCount == 0); if (pOldEntry->pAsset) { this->_assetsByPointer.erase(pOldEntry->pAsset.get()); } // This will actually delete the asset. this->_assets.erase(pOldEntry->key); } } // If this depot is not managing any live assets, then we no longer need to // keep it alive. if (this->_assets.size() == this->_deletionCandidates.size() && this->_liveInvalidatedAssets == 0) { this->_pKeepAlive.reset(); } } template void SharedAssetDepot::unmarkDeletionCandidate( const TAssetType& asset, bool threadOwnsDepotLock) { if (threadOwnsDepotLock) { this->unmarkDeletionCandidateUnderLock(asset); } else { LockHolder lock = this->lock(); this->unmarkDeletionCandidateUnderLock(asset); } } template void SharedAssetDepot:: unmarkDeletionCandidateUnderLock(const TAssetType& asset) { // This asset better not already be invalidated. That would imply this asset // was resurrected after its reference count hit zero. This should only be // possible if the asset depot returned a pointer to the asset, which it // will not do for one that is invalidated. CESIUM_ASSERT(!asset._isInvalidated); auto it = this->_assetsByPointer.find(const_cast(&asset)); CESIUM_ASSERT(it != this->_assetsByPointer.end()); if (it == this->_assetsByPointer.end()) { return; } CESIUM_ASSERT(it->second != nullptr); AssetEntry& entry = *it->second; bool isFound = this->_deletionCandidates.contains(entry); // The asset won't necessarily be found in the deletionCandidates set. // See: https://github.com/CesiumGS/cesium-native/issues/1073 if (isFound) { this->_totalDeletionCandidateMemoryUsage -= entry.sizeInDeletionList; this->_deletionCandidates.remove(entry); } // This depot is now managing at least one live asset, so keep it alive. this->_pKeepAlive = this; } template bool SharedAssetDepot::invalidateUnderLock( LockHolder&& lock, const TAssetKey& assetKey) { auto it = this->_assets.find(assetKey); if (it == this->_assets.end()) return false; AssetEntry* pEntry = it->second.get(); CESIUM_ASSERT(pEntry); // This will remove the asset from the deletion candidates list, if it's // there. CesiumUtility::ResultPointer assetResult = pEntry->toResultUnderLock(); bool wasInvalidated = false; if (assetResult.pValue) { if (!assetResult.pValue->_isInvalidated) { wasInvalidated = true; assetResult.pValue->_isInvalidated = true; ++this->_liveInvalidatedAssets; } this->_assetsByPointer.erase(assetResult.pValue.get()); } // Detach the asset from the AssetEntry, so that its lifetime is controlled by // reference counting. pEntry->pAsset.release(); // Remove the asset entry. This won't immediately delete the asset, because // `assetResult` above still holds a reference to it. But once that goes out // of scope, too, the asset _may_ be destroyed. this->_assets.erase(it); // Unlock the mutex before allowing `assetResult` to go out of scope. When it // goes out of scope, the asset may be destroyed. If it is, that would cause // us to try to re-enter the lock, which is not allowed. lock.unlock(); return wasInvalidated; } template CesiumUtility::ResultPointer SharedAssetDepot::AssetEntry:: toResultUnderLock() const { // This method is called while the calling thread already owns the depot // mutex. So we must take care not to lock it again, which could happen if // the asset is currently unreferenced and we naively create an // IntrusivePointer for it. CesiumUtility::IntrusivePointer p = nullptr; if (pAsset) { pAsset->addReference(true); p = pAsset.get(); pAsset->releaseReference(true); } return CesiumUtility::ResultPointer(p, errorsAndWarnings); } template SharedAssetDepot::LockHolder::LockHolder( const CesiumUtility::IntrusivePointer& pDepot_) : pDepot(pDepot_), lock(pDepot_->_mutex) {} template SharedAssetDepot::LockHolder::~LockHolder() = default; template void SharedAssetDepot::LockHolder::unlock() { this->lock.unlock(); } } // namespace CesiumAsync