streaming

package
v0.0.0-...-ea5c51a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2025 License: AGPL-3.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// Transient store key for storing staged events.
	StreamingManagerTransientStoreKey = "tmp_streaming"

	// Key for storing the count of staged events.
	StagedEventsCountKey = "EvtCnt"

	// Key prefix for staged events.
	StagedEventsKeyPrefix = "Evt:"
)

Constants for FullNodeStreamingManager.

Variables

This section is empty.

Functions

func FilterStreamUpdateBySubaccount

func FilterStreamUpdateBySubaccount(
	updates []clobtypes.StreamUpdate,
	subaccountIds []satypes.SubaccountId,
	logger log.Logger,
) []clobtypes.StreamUpdate

If UpdateMessage is not a StreamUpdate_OrderUpdate, filter it If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, filter it If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop it If checking subaccount ids in a StreamUpdate_OrderUpdate results in an error, log error and drop it

Types

type FullNodeStreamingManagerImpl

type FullNodeStreamingManagerImpl struct {
	sync.Mutex
	// contains filtered or unexported fields
}

FullNodeStreamingManagerImpl is an implementation for managing streaming subscriptions.

func NewFullNodeStreamingManager

func NewFullNodeStreamingManager(
	logger log.Logger,
	flushIntervalMs uint32,
	maxUpdatesInCache uint32,
	maxSubscriptionChannelSize uint32,
	snapshotBlockInterval uint32,
	streamingManagerTransientStoreKey storetypes.StoreKey,
	cdc codec.BinaryCodec,
) *FullNodeStreamingManagerImpl

func (*FullNodeStreamingManagerImpl) AddOrderUpdatesToCache

func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(
	updates []clobtypes.StreamUpdate,
	clobPairIds []uint32,
)

AddOrderUpdatesToCache adds a series of updates to the full node streaming cache. Clob pair ids are the clob pair id each update is relevant to.

func (*FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache

func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache(
	updates []clobtypes.StreamUpdate,
	subaccountIds []*satypes.SubaccountId,
)

AddSubaccountUpdatesToCache adds a series of updates to the full node streaming cache. Subaccount ids are the subaccount id each update is relevant to.

func (*FullNodeStreamingManagerImpl) AllClobPairSubscriptionIds

func (sm *FullNodeStreamingManagerImpl) AllClobPairSubscriptionIds() []uint32

func (*FullNodeStreamingManagerImpl) EmitMetrics

func (sm *FullNodeStreamingManagerImpl) EmitMetrics()

func (*FullNodeStreamingManagerImpl) Enabled

func (sm *FullNodeStreamingManagerImpl) Enabled() bool

func (*FullNodeStreamingManagerImpl) FlushStreamUpdates

func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdates()

func (*FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock

func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock()

FlushStreamUpdatesWithLock takes in a list of stream updates and their corresponding subscription IDs, and emits them to subscribers. Note this method requires the lock and assumes that the lock has already been acquired by the caller.

func (*FullNodeStreamingManagerImpl) GetPriceSnapshotsForInitStreams

func (sm *FullNodeStreamingManagerImpl) GetPriceSnapshotsForInitStreams(
	getPriceSnapshot func(marketId uint32) *pricestypes.StreamPriceUpdate,
) map[uint32]*pricestypes.StreamPriceUpdate

func (*FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents

func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(
	ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent

Retrieve all events staged during `FinalizeBlock`.

func (*FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams

func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams(
	getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate

func (*FullNodeStreamingManagerImpl) InitializeNewStreams

func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
	getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
	subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
	pricesSnapshots map[uint32]*pricestypes.StreamPriceUpdate,
	blockHeight uint32,
	execMode sdk.ExecMode,
)

func (*FullNodeStreamingManagerImpl) NewOrderbookSubscription

func (sm *FullNodeStreamingManagerImpl) NewOrderbookSubscription(
	clobPairIds []uint32,
	subaccountIds []satypes.SubaccountId,
	marketIds []uint32,
	messageSender types.OutgoingMessageSender,
) *OrderbookSubscription

func (*FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull

func (sm *FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull()

RemoveSubscriptionsAndClearBufferIfFull removes all subscriptions and wipes the buffer if buffer overflows. Note this method requires the lock and assumes that the lock has already been acquired by the caller.

func (*FullNodeStreamingManagerImpl) SendCombinedSnapshot

func (sm *FullNodeStreamingManagerImpl) SendCombinedSnapshot(
	offchainUpdates *clobtypes.OffchainUpdates,
	saUpdates []*satypes.StreamSubaccountUpdate,
	priceUpdates []*pricestypes.StreamPriceUpdate,
	subscriptionId uint32,
	blockHeight uint32,
	execMode sdk.ExecMode,
)

SendCombinedSnapshot sends messages to a particular subscriber without buffering. Note this method requires the lock and assumes that the lock has already been acquired by the caller.

func (*FullNodeStreamingManagerImpl) SendOrderbookFillUpdate

func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(
	orderbookFill clobtypes.StreamOrderbookFill,
	ctx sdk.Context,
	perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
)

SendOrderbookFillUpdate groups fills by their clob pair ids and sends messages to the subscribers.

func (*FullNodeStreamingManagerImpl) SendOrderbookUpdates

func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
	offchainUpdates *clobtypes.OffchainUpdates,
	ctx sdk.Context,
)

SendOrderbookUpdates groups updates by their clob pair ids and sends messages to the subscribers.

func (*FullNodeStreamingManagerImpl) SendPriceUpdate

func (sm *FullNodeStreamingManagerImpl) SendPriceUpdate(
	ctx sdk.Context,
	priceUpdate pricestypes.StreamPriceUpdate,
)

SendPriceUpdates sends price updates to the subscribers.

func (*FullNodeStreamingManagerImpl) SendSubaccountUpdate

func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
	ctx sdk.Context,
	subaccountUpdate satypes.StreamSubaccountUpdate,
)

Send a subaccount update event.

func (*FullNodeStreamingManagerImpl) SendTakerOrderStatus

func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
	streamTakerOrder clobtypes.StreamTakerOrder,
	ctx sdk.Context,
)

SendTakerOrderStatus sends out a taker order and its status to the full node streaming service.

func (*FullNodeStreamingManagerImpl) Stop

func (sm *FullNodeStreamingManagerImpl) Stop()

func (*FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock

func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
	ctx sdk.Context,
	orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
	perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
)

Grpc Streaming logic after consensus agrees on a block. - Stream all events staged during `FinalizeBlock`. - Stream orderbook updates to sync fills in local ops queue.

func (*FullNodeStreamingManagerImpl) Subscribe

func (sm *FullNodeStreamingManagerImpl) Subscribe(
	clobPairIds []uint32,
	subaccountIds []*satypes.SubaccountId,
	marketIds []uint32,
	filterOrdersBySubAccountId bool,
	messageSender types.OutgoingMessageSender,
) (
	err error,
)

func (*FullNodeStreamingManagerImpl) TracksMarketId

func (sm *FullNodeStreamingManagerImpl) TracksMarketId(marketId uint32) bool

TracksMarketId checks if a market id is being tracked by the streaming manager.

func (*FullNodeStreamingManagerImpl) TracksSubaccountId

func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes.SubaccountId) bool

TracksSubaccountId checks if a subaccount id is being tracked by the streaming manager.

type NoopGrpcStreamingManager

type NoopGrpcStreamingManager struct{}

func NewNoopGrpcStreamingManager

func NewNoopGrpcStreamingManager() *NoopGrpcStreamingManager

func (*NoopGrpcStreamingManager) Enabled

func (sm *NoopGrpcStreamingManager) Enabled() bool

func (*NoopGrpcStreamingManager) GetPriceSnapshotsForInitStreams

func (sm *NoopGrpcStreamingManager) GetPriceSnapshotsForInitStreams(
	_ func(_ uint32) *pricestypes.StreamPriceUpdate,
) map[uint32]*pricestypes.StreamPriceUpdate

func (*NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents

func (sm *NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents(
	ctx sdk.Context,
) []clobtypes.StagedFinalizeBlockEvent

func (*NoopGrpcStreamingManager) GetSubaccountSnapshotsForInitStreams

func (sm *NoopGrpcStreamingManager) GetSubaccountSnapshotsForInitStreams(
	getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate,
) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate

func (*NoopGrpcStreamingManager) InitializeNewStreams

func (sm *NoopGrpcStreamingManager) InitializeNewStreams(
	getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates,
	subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate,
	priceSnapshots map[uint32]*pricestypes.StreamPriceUpdate,
	blockHeight uint32,
	execMode sdk.ExecMode,
)

func (*NoopGrpcStreamingManager) SendOrderbookFillUpdate

func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdate(
	orderbookFill clobtypes.StreamOrderbookFill,
	ctx sdk.Context,
	perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
)

func (*NoopGrpcStreamingManager) SendOrderbookUpdates

func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(
	updates *clobtypes.OffchainUpdates,
	ctx sdk.Context,
)

func (*NoopGrpcStreamingManager) SendPriceUpdate

func (sm *NoopGrpcStreamingManager) SendPriceUpdate(
	ctx sdk.Context,
	priceUpdate pricestypes.StreamPriceUpdate,
)

func (*NoopGrpcStreamingManager) SendSubaccountUpdate

func (sm *NoopGrpcStreamingManager) SendSubaccountUpdate(
	ctx sdk.Context,
	subaccountUpdate satypes.StreamSubaccountUpdate,
)

func (*NoopGrpcStreamingManager) SendTakerOrderStatus

func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus(
	takerOrder clobtypes.StreamTakerOrder,
	ctx sdk.Context,
)

func (*NoopGrpcStreamingManager) Stop

func (sm *NoopGrpcStreamingManager) Stop()

func (*NoopGrpcStreamingManager) StreamBatchUpdatesAfterFinalizeBlock

func (sm *NoopGrpcStreamingManager) StreamBatchUpdatesAfterFinalizeBlock(
	ctx sdk.Context,
	orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates,
	perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
)

func (*NoopGrpcStreamingManager) Subscribe

func (sm *NoopGrpcStreamingManager) Subscribe(
	_ []uint32,
	_ []*satypes.SubaccountId,
	_ []uint32,
	_ bool,
	_ types.OutgoingMessageSender,
) (
	err error,
)

func (*NoopGrpcStreamingManager) TracksMarketId

func (sm *NoopGrpcStreamingManager) TracksMarketId(id uint32) bool

func (*NoopGrpcStreamingManager) TracksSubaccountId

func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId) bool

type OrderbookSubscription

type OrderbookSubscription struct {
	// contains filtered or unexported fields
}

OrderbookSubscription represents a active subscription to the orderbook updates stream.

func (*OrderbookSubscription) IsInitialized

func (sub *OrderbookSubscription) IsInitialized() bool

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL