internal

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2026 License: BSD-2-Clause Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// An ExplicitSubscription means the client subscribed to a resource by explicit providing its name.
	ExplicitSubscription = subscriptionType(iota)
	// A GlobSubscription means the client subscribed to a resource by specifying its parent glob
	// collection URL, implicitly subscribing it to all the resources that are part of the collection.
	GlobSubscription
	// A WildcardSubscription means the client subscribed to a resource by specifying the wildcard
	// (ads.WildcardSubscription), implicitly subscribing it to all resources in the cache.
	WildcardSubscription
)

The following subscriptionType constants define the ways a client can subscribe to a resource. See RawCache.Subscribe for additional details.

Variables

This section is empty.

Functions

func SetTimeProvider

func SetTimeProvider(now func() time.Time)

SetTimeProvider can be used to provide an alternative time provider, which is important when benchmarking the cache.

Types

type GlobCollectionsMap

type GlobCollectionsMap[T proto.Message] struct {
	// contains filtered or unexported fields
}

GlobCollectionsMap used to map individual GlobCollectionURL to their corresponding globCollection. This uses a ResourceMap under the hood because it has similar semantics to cache entries:

  1. A globCollection is created lazily, either when an entry for that collection is created, or a subscription to that collection is made.
  2. A globCollection is only deleted once all subscribers have unsubscribed and the collection is empty. Crucially, a collection can be empty but will remain in the cache as long as some subscribers remain subscribed.

func NewGlobCollectionsMap added in v0.2.8

func NewGlobCollectionsMap[T proto.Message]() *GlobCollectionsMap[T]

func (*GlobCollectionsMap[T]) IsSubscribed

func (gcm *GlobCollectionsMap[T]) IsSubscribed(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T]) (subscribed bool)

IsSubscribed checks if the given handler is subscribed to the collection.

func (*GlobCollectionsMap[T]) PutValueInCollection

func (gcm *GlobCollectionsMap[T]) PutValueInCollection(gcURL ads.GlobCollectionURL, value *WatchableValue[T])

PutValueInCollection creates the glob collection if it was not already created, and puts the given value in it.

func (*GlobCollectionsMap[T]) RemoveValueFromCollection

func (gcm *GlobCollectionsMap[T]) RemoveValueFromCollection(gcURL ads.GlobCollectionURL, value *WatchableValue[T])

RemoveValueFromCollection removes the given value from the collection. If the collection becomes empty as a result, it is removed from the map.

func (*GlobCollectionsMap[T]) Size added in v0.2.9

func (gcm *GlobCollectionsMap[T]) Size(gcURL ads.GlobCollectionURL) (size int)

Size returns the size of the glob collection for the given URL, or 0 if no such collection exists.

func (*GlobCollectionsMap[T]) Subscribe

func (gcm *GlobCollectionsMap[T]) Subscribe(
	gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T],
) (wgs []*sync.WaitGroup)

Subscribe creates or gets the corresponding collection for the given URL using createOrModifyCollection. It adds the given handler as a subscriber to the collection, then iterates through all the values in the collection, notifying the handler for each value. If the collection is empty, the handler will be notified that the resource is deleted. See the documentation on WatchableValue.NotifyHandlerAfterSubscription for more insight on the returned sync.WaitGroup slice.

func (*GlobCollectionsMap[T]) Unsubscribe

func (gcm *GlobCollectionsMap[T]) Unsubscribe(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T])

Unsubscribe invokes globCollection.unsubscribe on the collection for the given URL, if it exists. If, as a result, the collection becomes empty, it invokes deleteCollectionIfEmpty.

type Priority

type Priority int

type ResourceMap

type ResourceMap[K comparable, V any] xsync.Map[K, V]

ResourceMap is a concurrency-safe map. It deliberately does not expose bare Get or Put methods as its concurrency model is based on the assumption that access to the backing values must be strictly synchronized. Instead, all operations should be executed through the various Compute methods.

func NewResourceMap added in v0.2.8

func NewResourceMap[K comparable, V any]() *ResourceMap[K, V]

func (*ResourceMap[K, V]) Compute

func (m *ResourceMap[K, V]) Compute(
	key K,
	newValue func(key K) V,
	compute func(value V),
) (v V, created bool)

Compute first creates the value for the given key using the given function if no corresponding entry exists, then it executes the given compute function. It returns the value itself, and a boolean indicating whether the value was created.

func (*ResourceMap[K, V]) ComputeDeletion added in v0.2.8

func (m *ResourceMap[K, V]) ComputeDeletion(key K, condition func(value V) (deleteEntry bool)) (deleted bool)

ComputeDeletion loads the entry from the map if it still exists, then executes the given condition function with the value. If the condition returns true, the entry is deleted from the map, otherwise nothing happens. As a "compute" function, the condition is executed synchronously, in other words, it is guaranteed that no other "compute" functions are executing on that entry.

func (*ResourceMap[K, V]) ComputeIfPresent

func (m *ResourceMap[K, V]) ComputeIfPresent(key K, f func(value V)) (wasPresent bool)

ComputeIfPresent invokes the given function only if a corresponding entry exists in the map for the given key.

func (*ResourceMap[K, V]) Range added in v0.2.8

func (m *ResourceMap[K, V]) Range() iter.Seq2[K, V]

Range returns an iter.Seq2 that will iterate over all entries in this map.

func (*ResourceMap[K, V]) Size added in v0.2.8

func (m *ResourceMap[K, V]) Size() int

Size returns the current number of entries in the map.

type SubscriberSet

type SubscriberSet[T proto.Message] struct {
	// contains filtered or unexported fields
}

SubscriberSet is a concurrency-safe data structure that stores a set of unique subscribers. It is specifically designed to support wildcard and glob subscriptions such that they can be shared by multiple watchableValues instead of requiring each WatchableValue to store each subscriber. After subscribing to a given value, the SubscriptionHandler is supposed to be notified of the current value immediately, which usually simply means reading WatchableValue.currentValue and notifying the handler. However, it is possible that the notification loop for the WatchableValue is already running, and it could result in a double notification. To avoid this, this data structure introduces a notion of versioning. This way, the notification loop can record which version it is about to iterate over (in WatchableValue.lastSeenSubscriberSetVersions) such that subscribers can determine whether the loop will notify them and avoid the double notification. This is done by recording the version returned by SubscriberSet.Subscribe and checking whether it's equal to or smaller than the version in WatchableValue.lastSeenSubscriberSetVersions.

The implementation uses a sync.Map to store and iterate over the subscribers. In this case it's impossible to use a normal map since the subscriber set will be iterated over frequently. However, sync.Map provides no guarantees about what happens if the map is modified while another goroutine is iterating over the entries. Specifically, if an entry is added during the iteration, the iterator may or may not actually yield the new entry, which means the iterator may yield an entry that was added _after_ Iterator was invoked, violating the Iterator contract that it will only yield entries that were added before. To get around this, the returned iterator simply records the version at which it was initially created, and drops entries that have a greater version, making it always consistent.

func (*SubscriberSet[T]) IsEmpty added in v0.2.6

func (m *SubscriberSet[T]) IsEmpty() bool

IsEmpty is a convenience function that checks whether the set is empty˜.

func (*SubscriberSet[T]) IsSubscribed

func (m *SubscriberSet[T]) IsSubscribed(handler ads.SubscriptionHandler[T]) bool

IsSubscribed checks whether the given handler is subscribed to this set.

func (*SubscriberSet[T]) Iterator

func (m *SubscriberSet[T]) Iterator() SubscriberSetIterator[T]

Iterator returns a SubscriberSetIterator that will iterate over all the subscribers currently in the set.

func (*SubscriberSet[T]) Size

func (m *SubscriberSet[T]) Size() int

Size returns the number of subscribers in the set. For convenience, returns 0 if the receiver is nil.

func (*SubscriberSet[T]) SnapshotIterator added in v0.2.8

func (m *SubscriberSet[T]) SnapshotIterator(v SubscriberSetVersion) SubscriberSetIterator[T]

SnapshotIterator returns a SubscriberSetIterator that will only iterate over the subscribers that were added before or at the given version.

func (*SubscriberSet[T]) Subscribe

func (m *SubscriberSet[T]) Subscribe(handler ads.SubscriptionHandler[T]) (time.Time, SubscriberSetVersion)

Subscribe registers the given SubscriptionHandler as a subscriber and returns the time and version at which the subscription was processed. The returned version can be compared against the version returned by Iterator to check whether the given handler is present in the iterator.

func (*SubscriberSet[T]) Unsubscribe

func (m *SubscriberSet[T]) Unsubscribe(handler ads.SubscriptionHandler[T]) (empty bool)

Unsubscribe removes the given handler from the set, and returns whether the set is now empty as a result of this unsubscription.

func (*SubscriberSet[T]) Version added in v0.2.8

func (m *SubscriberSet[T]) Version() SubscriberSetVersion

Version returns the current version of this set. Invoking SubscriberSet.SnapshotIterator with the returned version will only yield subscribers added to this set at or before that version.

type SubscriberSetIterator

type SubscriberSetIterator[T proto.Message] iter.Seq2[ads.SubscriptionHandler[T], time.Time]

type SubscriberSetVersion

type SubscriberSetVersion uint64

SubscriberSetVersion is a monotonically increasing counter that tracks how many times subscribers have been added to a given SubscriberSet. This means a subscriber can check whether they are in a SubscriberSet by storing the version returned by SubscriberSet.Subscribe and comparing it against the version returned by SubscriberSet.Iterator.

type WatchableValue

type WatchableValue[T proto.Message] struct {

	// SubscriberSets is holds all the async.SubscriberSet instances relevant to this WatchableValue.
	SubscriberSets [subscriptionTypes]*SubscriberSet[T]
	// contains filtered or unexported fields
}

func NewValue

func NewValue[T proto.Message](
	name string,
	prioritySlots int,
) *WatchableValue[T]

func (*WatchableValue[T]) Clear

func (v *WatchableValue[T]) Clear(p Priority, clearedAt time.Time) (isFullClear bool)

func (*WatchableValue[T]) DeleteNowOrQueueDeletion added in v0.2.6

func (v *WatchableValue[T]) DeleteNowOrQueueDeletion(tryDelete func(name string)) bool

func (*WatchableValue[T]) IsSubscribed

func (v *WatchableValue[T]) IsSubscribed(handler ads.SubscriptionHandler[T]) bool

func (*WatchableValue[T]) NotifyHandlerAfterSubscription

func (v *WatchableValue[T]) NotifyHandlerAfterSubscription(
	handler ads.SubscriptionHandler[T],
	subType subscriptionType,
	subscribedAt time.Time,
	version SubscriberSetVersion,
) *sync.WaitGroup

NotifyHandlerAfterSubscription should be invoked by subscribers after subscribing to the corresponding SubscriberSet. If the returned sync.WaitGroup is nil, the subscriber has been notified of the current value. Otherwise, the subscriber will only be guaranteed to have been notified once the WaitGroup is done (more details can be found in the inline comments of this function, but the high level reason for this is that the notification loop will handle the notification, which avoids double notifications). A WaitGroup is returned instead of this function blocking inline to avoid a number of deadlocks that can occur in glob collections. The WaitGroup should only be waited on *outside* of the glob collection critical sections such as GlobCollectionsMap.Subscribe to avoid these deadlocks.

It is critical that the WaitGroup be waited on before the top-level Subscribe function in the root package returns, as it the API contract established by the cache. It also helps the implementation of the xDS protocol, since it explicitly states the following: https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#subscribing-to-resources

A resource_names_subscribe field may contain resource names that the server believes the client is
already subscribed to, and furthermore has the most recent versions of. However, the server must
still provide those resources in the response; due to implementation details hidden from the
server, the client may have "forgotten" those resources despite apparently remaining subscribed.

func (*WatchableValue[T]) Read

func (v *WatchableValue[T]) Read() *ads.Resource[T]

func (*WatchableValue[T]) Set

func (v *WatchableValue[T]) Set(p Priority, r *ads.Resource[T], modifiedAt time.Time)

Set updates resource to the given version and value and notifies all subscribers of the new value. It is invalid to invoke this method with a nil resource.

func (*WatchableValue[T]) Subscribe

func (v *WatchableValue[T]) Subscribe(handler ads.SubscriptionHandler[T]) *sync.WaitGroup

func (*WatchableValue[T]) Unsubscribe

func (v *WatchableValue[T]) Unsubscribe(handler ads.SubscriptionHandler[T]) (empty bool)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL