Documentation
¶
Index ¶
- Constants
- func FilterStreamUpdateBySubaccount(updates []clobtypes.StreamUpdate, subaccountIds []satypes.SubaccountId, ...) []clobtypes.StreamUpdate
- type FullNodeStreamingManagerImpl
- func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(updates []clobtypes.StreamUpdate, clobPairIds []uint32)
- func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache(updates []clobtypes.StreamUpdate, subaccountIds []*satypes.SubaccountId)
- func (sm *FullNodeStreamingManagerImpl) AllClobPairSubscriptionIds() []uint32
- func (sm *FullNodeStreamingManagerImpl) EmitMetrics()
- func (sm *FullNodeStreamingManagerImpl) Enabled() bool
- func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdates()
- func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock()
- func (sm *FullNodeStreamingManagerImpl) GetPriceSnapshotsForInitStreams(getPriceSnapshot func(marketId uint32) *pricestypes.StreamPriceUpdate) map[uint32]*pricestypes.StreamPriceUpdate
- func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents(ctx sdk.Context) []clobtypes.StagedFinalizeBlockEvent
- func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams(...) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate
- func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(...)
- func (sm *FullNodeStreamingManagerImpl) NewOrderbookSubscription(clobPairIds []uint32, subaccountIds []satypes.SubaccountId, marketIds []uint32, ...) *OrderbookSubscription
- func (sm *FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull()
- func (sm *FullNodeStreamingManagerImpl) SendCombinedSnapshot(offchainUpdates *clobtypes.OffchainUpdates, ...)
- func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate(orderbookFill clobtypes.StreamOrderbookFill, ctx sdk.Context, ...)
- func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(offchainUpdates *clobtypes.OffchainUpdates, ctx sdk.Context)
- func (sm *FullNodeStreamingManagerImpl) SendPriceUpdate(ctx sdk.Context, priceUpdate pricestypes.StreamPriceUpdate)
- func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate)
- func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(streamTakerOrder clobtypes.StreamTakerOrder, ctx sdk.Context)
- func (sm *FullNodeStreamingManagerImpl) Stop()
- func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(ctx sdk.Context, ...)
- func (sm *FullNodeStreamingManagerImpl) Subscribe(clobPairIds []uint32, subaccountIds []*satypes.SubaccountId, ...) (err error)
- func (sm *FullNodeStreamingManagerImpl) TracksMarketId(marketId uint32) bool
- func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes.SubaccountId) bool
- type NoopGrpcStreamingManager
- func (sm *NoopGrpcStreamingManager) Enabled() bool
- func (sm *NoopGrpcStreamingManager) GetPriceSnapshotsForInitStreams(_ func(_ uint32) *pricestypes.StreamPriceUpdate) map[uint32]*pricestypes.StreamPriceUpdate
- func (sm *NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents(ctx sdk.Context) []clobtypes.StagedFinalizeBlockEvent
- func (sm *NoopGrpcStreamingManager) GetSubaccountSnapshotsForInitStreams(...) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate
- func (sm *NoopGrpcStreamingManager) InitializeNewStreams(...)
- func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdate(orderbookFill clobtypes.StreamOrderbookFill, ctx sdk.Context, ...)
- func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates(updates *clobtypes.OffchainUpdates, ctx sdk.Context)
- func (sm *NoopGrpcStreamingManager) SendPriceUpdate(ctx sdk.Context, priceUpdate pricestypes.StreamPriceUpdate)
- func (sm *NoopGrpcStreamingManager) SendSubaccountUpdate(ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate)
- func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus(takerOrder clobtypes.StreamTakerOrder, ctx sdk.Context)
- func (sm *NoopGrpcStreamingManager) Stop()
- func (sm *NoopGrpcStreamingManager) StreamBatchUpdatesAfterFinalizeBlock(ctx sdk.Context, ...)
- func (sm *NoopGrpcStreamingManager) Subscribe(_ []uint32, _ []*satypes.SubaccountId, _ []uint32, _ bool, ...) (err error)
- func (sm *NoopGrpcStreamingManager) TracksMarketId(id uint32) bool
- func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId) bool
- type OrderbookSubscription
Constants ¶
const ( // Transient store key for storing staged events. StreamingManagerTransientStoreKey = "tmp_streaming" // Key for storing the count of staged events. StagedEventsCountKey = "EvtCnt" // Key prefix for staged events. StagedEventsKeyPrefix = "Evt:" )
Constants for FullNodeStreamingManager.
Variables ¶
This section is empty.
Functions ¶
func FilterStreamUpdateBySubaccount ¶
func FilterStreamUpdateBySubaccount( updates []clobtypes.StreamUpdate, subaccountIds []satypes.SubaccountId, logger log.Logger, ) []clobtypes.StreamUpdate
If UpdateMessage is not a StreamUpdate_OrderUpdate, filter it If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, filter it If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop it If checking subaccount ids in a StreamUpdate_OrderUpdate results in an error, log error and drop it
Types ¶
type FullNodeStreamingManagerImpl ¶
FullNodeStreamingManagerImpl is an implementation for managing streaming subscriptions.
func NewFullNodeStreamingManager ¶
func NewFullNodeStreamingManager( logger log.Logger, flushIntervalMs uint32, maxUpdatesInCache uint32, maxSubscriptionChannelSize uint32, snapshotBlockInterval uint32, streamingManagerTransientStoreKey storetypes.StoreKey, cdc codec.BinaryCodec, ) *FullNodeStreamingManagerImpl
func (*FullNodeStreamingManagerImpl) AddOrderUpdatesToCache ¶
func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache( updates []clobtypes.StreamUpdate, clobPairIds []uint32, )
AddOrderUpdatesToCache adds a series of updates to the full node streaming cache. Clob pair ids are the clob pair id each update is relevant to.
func (*FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache ¶
func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache( updates []clobtypes.StreamUpdate, subaccountIds []*satypes.SubaccountId, )
AddSubaccountUpdatesToCache adds a series of updates to the full node streaming cache. Subaccount ids are the subaccount id each update is relevant to.
func (*FullNodeStreamingManagerImpl) AllClobPairSubscriptionIds ¶
func (sm *FullNodeStreamingManagerImpl) AllClobPairSubscriptionIds() []uint32
func (*FullNodeStreamingManagerImpl) EmitMetrics ¶
func (sm *FullNodeStreamingManagerImpl) EmitMetrics()
func (*FullNodeStreamingManagerImpl) Enabled ¶
func (sm *FullNodeStreamingManagerImpl) Enabled() bool
func (*FullNodeStreamingManagerImpl) FlushStreamUpdates ¶
func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdates()
func (*FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock ¶
func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock()
FlushStreamUpdatesWithLock takes in a list of stream updates and their corresponding subscription IDs, and emits them to subscribers. Note this method requires the lock and assumes that the lock has already been acquired by the caller.
func (*FullNodeStreamingManagerImpl) GetPriceSnapshotsForInitStreams ¶
func (sm *FullNodeStreamingManagerImpl) GetPriceSnapshotsForInitStreams( getPriceSnapshot func(marketId uint32) *pricestypes.StreamPriceUpdate, ) map[uint32]*pricestypes.StreamPriceUpdate
func (*FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents ¶
func (sm *FullNodeStreamingManagerImpl) GetStagedFinalizeBlockEvents( ctx sdk.Context, ) []clobtypes.StagedFinalizeBlockEvent
Retrieve all events staged during `FinalizeBlock`.
func (*FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams ¶
func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams( getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, ) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate
func (*FullNodeStreamingManagerImpl) InitializeNewStreams ¶
func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, pricesSnapshots map[uint32]*pricestypes.StreamPriceUpdate, blockHeight uint32, execMode sdk.ExecMode, )
func (*FullNodeStreamingManagerImpl) NewOrderbookSubscription ¶
func (sm *FullNodeStreamingManagerImpl) NewOrderbookSubscription( clobPairIds []uint32, subaccountIds []satypes.SubaccountId, marketIds []uint32, messageSender types.OutgoingMessageSender, ) *OrderbookSubscription
func (*FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull ¶
func (sm *FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull()
RemoveSubscriptionsAndClearBufferIfFull removes all subscriptions and wipes the buffer if buffer overflows. Note this method requires the lock and assumes that the lock has already been acquired by the caller.
func (*FullNodeStreamingManagerImpl) SendCombinedSnapshot ¶
func (sm *FullNodeStreamingManagerImpl) SendCombinedSnapshot( offchainUpdates *clobtypes.OffchainUpdates, saUpdates []*satypes.StreamSubaccountUpdate, priceUpdates []*pricestypes.StreamPriceUpdate, subscriptionId uint32, blockHeight uint32, execMode sdk.ExecMode, )
SendCombinedSnapshot sends messages to a particular subscriber without buffering. Note this method requires the lock and assumes that the lock has already been acquired by the caller.
func (*FullNodeStreamingManagerImpl) SendOrderbookFillUpdate ¶
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdate( orderbookFill clobtypes.StreamOrderbookFill, ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, )
SendOrderbookFillUpdate groups fills by their clob pair ids and sends messages to the subscribers.
func (*FullNodeStreamingManagerImpl) SendOrderbookUpdates ¶
func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates( offchainUpdates *clobtypes.OffchainUpdates, ctx sdk.Context, )
SendOrderbookUpdates groups updates by their clob pair ids and sends messages to the subscribers.
func (*FullNodeStreamingManagerImpl) SendPriceUpdate ¶
func (sm *FullNodeStreamingManagerImpl) SendPriceUpdate( ctx sdk.Context, priceUpdate pricestypes.StreamPriceUpdate, )
SendPriceUpdates sends price updates to the subscribers.
func (*FullNodeStreamingManagerImpl) SendSubaccountUpdate ¶
func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, )
Send a subaccount update event.
func (*FullNodeStreamingManagerImpl) SendTakerOrderStatus ¶
func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus( streamTakerOrder clobtypes.StreamTakerOrder, ctx sdk.Context, )
SendTakerOrderStatus sends out a taker order and its status to the full node streaming service.
func (*FullNodeStreamingManagerImpl) Stop ¶
func (sm *FullNodeStreamingManagerImpl) Stop()
func (*FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock ¶
func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( ctx sdk.Context, orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, )
Grpc Streaming logic after consensus agrees on a block. - Stream all events staged during `FinalizeBlock`. - Stream orderbook updates to sync fills in local ops queue.
func (*FullNodeStreamingManagerImpl) Subscribe ¶
func (sm *FullNodeStreamingManagerImpl) Subscribe( clobPairIds []uint32, subaccountIds []*satypes.SubaccountId, marketIds []uint32, filterOrdersBySubAccountId bool, messageSender types.OutgoingMessageSender, ) ( err error, )
func (*FullNodeStreamingManagerImpl) TracksMarketId ¶
func (sm *FullNodeStreamingManagerImpl) TracksMarketId(marketId uint32) bool
TracksMarketId checks if a market id is being tracked by the streaming manager.
func (*FullNodeStreamingManagerImpl) TracksSubaccountId ¶
func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes.SubaccountId) bool
TracksSubaccountId checks if a subaccount id is being tracked by the streaming manager.
type NoopGrpcStreamingManager ¶
type NoopGrpcStreamingManager struct{}
func NewNoopGrpcStreamingManager ¶
func NewNoopGrpcStreamingManager() *NoopGrpcStreamingManager
func (*NoopGrpcStreamingManager) Enabled ¶
func (sm *NoopGrpcStreamingManager) Enabled() bool
func (*NoopGrpcStreamingManager) GetPriceSnapshotsForInitStreams ¶
func (sm *NoopGrpcStreamingManager) GetPriceSnapshotsForInitStreams( _ func(_ uint32) *pricestypes.StreamPriceUpdate, ) map[uint32]*pricestypes.StreamPriceUpdate
func (*NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents ¶
func (sm *NoopGrpcStreamingManager) GetStagedFinalizeBlockEvents( ctx sdk.Context, ) []clobtypes.StagedFinalizeBlockEvent
func (*NoopGrpcStreamingManager) GetSubaccountSnapshotsForInitStreams ¶
func (sm *NoopGrpcStreamingManager) GetSubaccountSnapshotsForInitStreams( getSubaccountSnapshot func(subaccountId satypes.SubaccountId) *satypes.StreamSubaccountUpdate, ) map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate
func (*NoopGrpcStreamingManager) InitializeNewStreams ¶
func (sm *NoopGrpcStreamingManager) InitializeNewStreams( getOrderbookSnapshot func(clobPairId clobtypes.ClobPairId) *clobtypes.OffchainUpdates, subaccountSnapshots map[satypes.SubaccountId]*satypes.StreamSubaccountUpdate, priceSnapshots map[uint32]*pricestypes.StreamPriceUpdate, blockHeight uint32, execMode sdk.ExecMode, )
func (*NoopGrpcStreamingManager) SendOrderbookFillUpdate ¶
func (sm *NoopGrpcStreamingManager) SendOrderbookFillUpdate( orderbookFill clobtypes.StreamOrderbookFill, ctx sdk.Context, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, )
func (*NoopGrpcStreamingManager) SendOrderbookUpdates ¶
func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( updates *clobtypes.OffchainUpdates, ctx sdk.Context, )
func (*NoopGrpcStreamingManager) SendPriceUpdate ¶
func (sm *NoopGrpcStreamingManager) SendPriceUpdate( ctx sdk.Context, priceUpdate pricestypes.StreamPriceUpdate, )
func (*NoopGrpcStreamingManager) SendSubaccountUpdate ¶
func (sm *NoopGrpcStreamingManager) SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, )
func (*NoopGrpcStreamingManager) SendTakerOrderStatus ¶
func (sm *NoopGrpcStreamingManager) SendTakerOrderStatus( takerOrder clobtypes.StreamTakerOrder, ctx sdk.Context, )
func (*NoopGrpcStreamingManager) Stop ¶
func (sm *NoopGrpcStreamingManager) Stop()
func (*NoopGrpcStreamingManager) StreamBatchUpdatesAfterFinalizeBlock ¶
func (sm *NoopGrpcStreamingManager) StreamBatchUpdatesAfterFinalizeBlock( ctx sdk.Context, orderBookUpdatesToSyncLocalOpsQueue *clobtypes.OffchainUpdates, perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId, )
func (*NoopGrpcStreamingManager) Subscribe ¶
func (sm *NoopGrpcStreamingManager) Subscribe( _ []uint32, _ []*satypes.SubaccountId, _ []uint32, _ bool, _ types.OutgoingMessageSender, ) ( err error, )
func (*NoopGrpcStreamingManager) TracksMarketId ¶
func (sm *NoopGrpcStreamingManager) TracksMarketId(id uint32) bool
func (*NoopGrpcStreamingManager) TracksSubaccountId ¶
func (sm *NoopGrpcStreamingManager) TracksSubaccountId(id satypes.SubaccountId) bool
type OrderbookSubscription ¶
type OrderbookSubscription struct {
// contains filtered or unexported fields
}
OrderbookSubscription represents a active subscription to the orderbook updates stream.
func (*OrderbookSubscription) IsInitialized ¶
func (sub *OrderbookSubscription) IsInitialized() bool