Documentation
¶
Index ¶
- Constants
- func SetTimeProvider(now func() time.Time)
- type GlobCollectionsMap
- func (gcm *GlobCollectionsMap[T]) IsSubscribed(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T]) (subscribed bool)
- func (gcm *GlobCollectionsMap[T]) PutValueInCollection(gcURL ads.GlobCollectionURL, value *WatchableValue[T])
- func (gcm *GlobCollectionsMap[T]) RemoveValueFromCollection(gcURL ads.GlobCollectionURL, value *WatchableValue[T])
- func (gcm *GlobCollectionsMap[T]) Size(gcURL ads.GlobCollectionURL) (size int)
- func (gcm *GlobCollectionsMap[T]) Subscribe(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T]) (wgs []*sync.WaitGroup)
- func (gcm *GlobCollectionsMap[T]) Unsubscribe(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T])
- type Priority
- type ResourceMap
- func (m *ResourceMap[K, V]) Compute(key K, newValue func(key K) V, compute func(value V)) (v V, created bool)
- func (m *ResourceMap[K, V]) ComputeDeletion(key K, condition func(value V) (deleteEntry bool)) (deleted bool)
- func (m *ResourceMap[K, V]) ComputeIfPresent(key K, f func(value V)) (wasPresent bool)
- func (m *ResourceMap[K, V]) Range() iter.Seq2[K, V]
- func (m *ResourceMap[K, V]) Size() int
- type SubscriberSet
- func (m *SubscriberSet[T]) IsEmpty() bool
- func (m *SubscriberSet[T]) IsSubscribed(handler ads.SubscriptionHandler[T]) bool
- func (m *SubscriberSet[T]) Iterator() SubscriberSetIterator[T]
- func (m *SubscriberSet[T]) Size() int
- func (m *SubscriberSet[T]) SnapshotIterator(v SubscriberSetVersion) SubscriberSetIterator[T]
- func (m *SubscriberSet[T]) Subscribe(handler ads.SubscriptionHandler[T]) (time.Time, SubscriberSetVersion)
- func (m *SubscriberSet[T]) Unsubscribe(handler ads.SubscriptionHandler[T]) (empty bool)
- func (m *SubscriberSet[T]) Version() SubscriberSetVersion
- type SubscriberSetIterator
- type SubscriberSetVersion
- type WatchableValue
- func (v *WatchableValue[T]) Clear(p Priority, clearedAt time.Time) (isFullClear bool)
- func (v *WatchableValue[T]) DeleteNowOrQueueDeletion(tryDelete func(name string)) bool
- func (v *WatchableValue[T]) IsSubscribed(handler ads.SubscriptionHandler[T]) bool
- func (v *WatchableValue[T]) NotifyHandlerAfterSubscription(handler ads.SubscriptionHandler[T], subType subscriptionType, ...) *sync.WaitGroup
- func (v *WatchableValue[T]) Read() *ads.Resource[T]
- func (v *WatchableValue[T]) Set(p Priority, r *ads.Resource[T], modifiedAt time.Time)
- func (v *WatchableValue[T]) Subscribe(handler ads.SubscriptionHandler[T]) *sync.WaitGroup
- func (v *WatchableValue[T]) Unsubscribe(handler ads.SubscriptionHandler[T]) (empty bool)
Constants ¶
const ( // An ExplicitSubscription means the client subscribed to a resource by explicit providing its name. ExplicitSubscription = subscriptionType(iota) // A GlobSubscription means the client subscribed to a resource by specifying its parent glob // collection URL, implicitly subscribing it to all the resources that are part of the collection. GlobSubscription // A WildcardSubscription means the client subscribed to a resource by specifying the wildcard // (ads.WildcardSubscription), implicitly subscribing it to all resources in the cache. WildcardSubscription )
The following subscriptionType constants define the ways a client can subscribe to a resource. See RawCache.Subscribe for additional details.
Variables ¶
This section is empty.
Functions ¶
func SetTimeProvider ¶
SetTimeProvider can be used to provide an alternative time provider, which is important when benchmarking the cache.
Types ¶
type GlobCollectionsMap ¶
GlobCollectionsMap used to map individual GlobCollectionURL to their corresponding globCollection. This uses a ResourceMap under the hood because it has similar semantics to cache entries:
- A globCollection is created lazily, either when an entry for that collection is created, or a subscription to that collection is made.
- A globCollection is only deleted once all subscribers have unsubscribed and the collection is empty. Crucially, a collection can be empty but will remain in the cache as long as some subscribers remain subscribed.
func NewGlobCollectionsMap ¶ added in v0.2.8
func NewGlobCollectionsMap[T proto.Message]() *GlobCollectionsMap[T]
func (*GlobCollectionsMap[T]) IsSubscribed ¶
func (gcm *GlobCollectionsMap[T]) IsSubscribed(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T]) (subscribed bool)
IsSubscribed checks if the given handler is subscribed to the collection.
func (*GlobCollectionsMap[T]) PutValueInCollection ¶
func (gcm *GlobCollectionsMap[T]) PutValueInCollection(gcURL ads.GlobCollectionURL, value *WatchableValue[T])
PutValueInCollection creates the glob collection if it was not already created, and puts the given value in it.
func (*GlobCollectionsMap[T]) RemoveValueFromCollection ¶
func (gcm *GlobCollectionsMap[T]) RemoveValueFromCollection(gcURL ads.GlobCollectionURL, value *WatchableValue[T])
RemoveValueFromCollection removes the given value from the collection. If the collection becomes empty as a result, it is removed from the map.
func (*GlobCollectionsMap[T]) Size ¶ added in v0.2.9
func (gcm *GlobCollectionsMap[T]) Size(gcURL ads.GlobCollectionURL) (size int)
Size returns the size of the glob collection for the given URL, or 0 if no such collection exists.
func (*GlobCollectionsMap[T]) Subscribe ¶
func (gcm *GlobCollectionsMap[T]) Subscribe( gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T], ) (wgs []*sync.WaitGroup)
Subscribe creates or gets the corresponding collection for the given URL using createOrModifyCollection. It adds the given handler as a subscriber to the collection, then iterates through all the values in the collection, notifying the handler for each value. If the collection is empty, the handler will be notified that the resource is deleted. See the documentation on WatchableValue.NotifyHandlerAfterSubscription for more insight on the returned sync.WaitGroup slice.
func (*GlobCollectionsMap[T]) Unsubscribe ¶
func (gcm *GlobCollectionsMap[T]) Unsubscribe(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T])
Unsubscribe invokes globCollection.unsubscribe on the collection for the given URL, if it exists. If, as a result, the collection becomes empty, it invokes deleteCollectionIfEmpty.
type ResourceMap ¶
type ResourceMap[K comparable, V any] xsync.Map[K, V]
ResourceMap is a concurrency-safe map. It deliberately does not expose bare Get or Put methods as its concurrency model is based on the assumption that access to the backing values must be strictly synchronized. Instead, all operations should be executed through the various Compute methods.
func NewResourceMap ¶ added in v0.2.8
func NewResourceMap[K comparable, V any]() *ResourceMap[K, V]
func (*ResourceMap[K, V]) Compute ¶
func (m *ResourceMap[K, V]) Compute( key K, newValue func(key K) V, compute func(value V), ) (v V, created bool)
Compute first creates the value for the given key using the given function if no corresponding entry exists, then it executes the given compute function. It returns the value itself, and a boolean indicating whether the value was created.
func (*ResourceMap[K, V]) ComputeDeletion ¶ added in v0.2.8
func (m *ResourceMap[K, V]) ComputeDeletion(key K, condition func(value V) (deleteEntry bool)) (deleted bool)
ComputeDeletion loads the entry from the map if it still exists, then executes the given condition function with the value. If the condition returns true, the entry is deleted from the map, otherwise nothing happens. As a "compute" function, the condition is executed synchronously, in other words, it is guaranteed that no other "compute" functions are executing on that entry.
func (*ResourceMap[K, V]) ComputeIfPresent ¶
func (m *ResourceMap[K, V]) ComputeIfPresent(key K, f func(value V)) (wasPresent bool)
ComputeIfPresent invokes the given function only if a corresponding entry exists in the map for the given key.
func (*ResourceMap[K, V]) Range ¶ added in v0.2.8
func (m *ResourceMap[K, V]) Range() iter.Seq2[K, V]
Range returns an iter.Seq2 that will iterate over all entries in this map.
func (*ResourceMap[K, V]) Size ¶ added in v0.2.8
func (m *ResourceMap[K, V]) Size() int
Size returns the current number of entries in the map.
type SubscriberSet ¶
SubscriberSet is a concurrency-safe data structure that stores a set of unique subscribers. It is specifically designed to support wildcard and glob subscriptions such that they can be shared by multiple watchableValues instead of requiring each WatchableValue to store each subscriber. After subscribing to a given value, the SubscriptionHandler is supposed to be notified of the current value immediately, which usually simply means reading WatchableValue.currentValue and notifying the handler. However, it is possible that the notification loop for the WatchableValue is already running, and it could result in a double notification. To avoid this, this data structure introduces a notion of versioning. This way, the notification loop can record which version it is about to iterate over (in WatchableValue.lastSeenSubscriberSetVersions) such that subscribers can determine whether the loop will notify them and avoid the double notification. This is done by recording the version returned by SubscriberSet.Subscribe and checking whether it's equal to or smaller than the version in WatchableValue.lastSeenSubscriberSetVersions.
The implementation uses a sync.Map to store and iterate over the subscribers. In this case it's impossible to use a normal map since the subscriber set will be iterated over frequently. However, sync.Map provides no guarantees about what happens if the map is modified while another goroutine is iterating over the entries. Specifically, if an entry is added during the iteration, the iterator may or may not actually yield the new entry, which means the iterator may yield an entry that was added _after_ Iterator was invoked, violating the Iterator contract that it will only yield entries that were added before. To get around this, the returned iterator simply records the version at which it was initially created, and drops entries that have a greater version, making it always consistent.
func (*SubscriberSet[T]) IsEmpty ¶ added in v0.2.6
func (m *SubscriberSet[T]) IsEmpty() bool
IsEmpty is a convenience function that checks whether the set is empty˜.
func (*SubscriberSet[T]) IsSubscribed ¶
func (m *SubscriberSet[T]) IsSubscribed(handler ads.SubscriptionHandler[T]) bool
IsSubscribed checks whether the given handler is subscribed to this set.
func (*SubscriberSet[T]) Iterator ¶
func (m *SubscriberSet[T]) Iterator() SubscriberSetIterator[T]
Iterator returns a SubscriberSetIterator that will iterate over all the subscribers currently in the set.
func (*SubscriberSet[T]) Size ¶
func (m *SubscriberSet[T]) Size() int
Size returns the number of subscribers in the set. For convenience, returns 0 if the receiver is nil.
func (*SubscriberSet[T]) SnapshotIterator ¶ added in v0.2.8
func (m *SubscriberSet[T]) SnapshotIterator(v SubscriberSetVersion) SubscriberSetIterator[T]
SnapshotIterator returns a SubscriberSetIterator that will only iterate over the subscribers that were added before or at the given version.
func (*SubscriberSet[T]) Subscribe ¶
func (m *SubscriberSet[T]) Subscribe(handler ads.SubscriptionHandler[T]) (time.Time, SubscriberSetVersion)
Subscribe registers the given SubscriptionHandler as a subscriber and returns the time and version at which the subscription was processed. The returned version can be compared against the version returned by Iterator to check whether the given handler is present in the iterator.
func (*SubscriberSet[T]) Unsubscribe ¶
func (m *SubscriberSet[T]) Unsubscribe(handler ads.SubscriptionHandler[T]) (empty bool)
Unsubscribe removes the given handler from the set, and returns whether the set is now empty as a result of this unsubscription.
func (*SubscriberSet[T]) Version ¶ added in v0.2.8
func (m *SubscriberSet[T]) Version() SubscriberSetVersion
Version returns the current version of this set. Invoking SubscriberSet.SnapshotIterator with the returned version will only yield subscribers added to this set at or before that version.
type SubscriberSetIterator ¶
type SubscriberSetVersion ¶
type SubscriberSetVersion uint64
SubscriberSetVersion is a monotonically increasing counter that tracks how many times subscribers have been added to a given SubscriberSet. This means a subscriber can check whether they are in a SubscriberSet by storing the version returned by SubscriberSet.Subscribe and comparing it against the version returned by SubscriberSet.Iterator.
type WatchableValue ¶
type WatchableValue[T proto.Message] struct { // SubscriberSets is holds all the async.SubscriberSet instances relevant to this WatchableValue. SubscriberSets [subscriptionTypes]*SubscriberSet[T] // contains filtered or unexported fields }
func NewValue ¶
func NewValue[T proto.Message]( name string, prioritySlots int, ) *WatchableValue[T]
func (*WatchableValue[T]) Clear ¶
func (v *WatchableValue[T]) Clear(p Priority, clearedAt time.Time) (isFullClear bool)
func (*WatchableValue[T]) DeleteNowOrQueueDeletion ¶ added in v0.2.6
func (v *WatchableValue[T]) DeleteNowOrQueueDeletion(tryDelete func(name string)) bool
func (*WatchableValue[T]) IsSubscribed ¶
func (v *WatchableValue[T]) IsSubscribed(handler ads.SubscriptionHandler[T]) bool
func (*WatchableValue[T]) NotifyHandlerAfterSubscription ¶
func (v *WatchableValue[T]) NotifyHandlerAfterSubscription( handler ads.SubscriptionHandler[T], subType subscriptionType, subscribedAt time.Time, version SubscriberSetVersion, ) *sync.WaitGroup
NotifyHandlerAfterSubscription should be invoked by subscribers after subscribing to the corresponding SubscriberSet. If the returned sync.WaitGroup is nil, the subscriber has been notified of the current value. Otherwise, the subscriber will only be guaranteed to have been notified once the WaitGroup is done (more details can be found in the inline comments of this function, but the high level reason for this is that the notification loop will handle the notification, which avoids double notifications). A WaitGroup is returned instead of this function blocking inline to avoid a number of deadlocks that can occur in glob collections. The WaitGroup should only be waited on *outside* of the glob collection critical sections such as GlobCollectionsMap.Subscribe to avoid these deadlocks.
It is critical that the WaitGroup be waited on before the top-level Subscribe function in the root package returns, as it the API contract established by the cache. It also helps the implementation of the xDS protocol, since it explicitly states the following: https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#subscribing-to-resources
A resource_names_subscribe field may contain resource names that the server believes the client is already subscribed to, and furthermore has the most recent versions of. However, the server must still provide those resources in the response; due to implementation details hidden from the server, the client may have "forgotten" those resources despite apparently remaining subscribed.
func (*WatchableValue[T]) Read ¶
func (v *WatchableValue[T]) Read() *ads.Resource[T]
func (*WatchableValue[T]) Set ¶
Set updates resource to the given version and value and notifies all subscribers of the new value. It is invalid to invoke this method with a nil resource.
func (*WatchableValue[T]) Subscribe ¶
func (v *WatchableValue[T]) Subscribe(handler ads.SubscriptionHandler[T]) *sync.WaitGroup
func (*WatchableValue[T]) Unsubscribe ¶
func (v *WatchableValue[T]) Unsubscribe(handler ads.SubscriptionHandler[T]) (empty bool)