Documentation
¶
Index ¶
- Constants
- Variables
- func Exists(dirname string) (bool, error)
- func GetByHash[T NativeObject](orm *ORM, cid rdx.ID, fid uint32, tlv []byte) (T, error)
- func SeekClass[T NativeObject](orm *ORM, cid rdx.ID) iter.Seq[T]
- type CallHook
- type Chotki
- func (cho *Chotki) AddHook(fid rdx.ID, hook Hook)
- func (cho *Chotki) AddToNField(ctx context.Context, fid rdx.ID, count uint64) (id rdx.ID, err error)
- func (cho *Chotki) ApplyC(id, ref rdx.ID, body []byte, batch *pebble.Batch, calls *[]CallHook) (err error)
- func (cho *Chotki) ApplyD(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) ApplyE(id, r rdx.ID, body []byte, batch *pebble.Batch, calls *[]CallHook) (err error)
- func (cho *Chotki) ApplyH(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) ApplyOY(lot byte, id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) ApplyV(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error)
- func (cho *Chotki) Broadcast(ctx context.Context, records protocol.Records, except string)
- func (cho *Chotki) ClassFields(cid rdx.ID) (fields classes.Fields, err error)
- func (cho *Chotki) Clock() rdx.Clock
- func (cho *Chotki) Close() error
- func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body protocol.Records) (id rdx.ID, err error)
- func (cho *Chotki) Connect(addr string) error
- func (cho *Chotki) ConnectPool(name string, addrs []string) error
- func (cho *Chotki) Counter(rid rdx.ID, offset uint64, updatePeriod time.Duration) *counters.AtomicCounter
- func (cho *Chotki) Database() *pebble.DB
- func (cho *Chotki) Directory() string
- func (cho *Chotki) Disconnect(addr string) error
- func (cho *Chotki) Drain(ctx context.Context, recs protocol.Records) (err error)
- func (cho *Chotki) DumpAll(writer io.Writer)
- func (cho *Chotki) DumpObjects(writer io.Writer)
- func (cho *Chotki) DumpVV(writer io.Writer)
- func (cho *Chotki) EditFieldTLV(ctx context.Context, fid rdx.ID, delta []byte) (id rdx.ID, err error)
- func (cho *Chotki) EditObjectRDX(ctx context.Context, oid rdx.ID, pairs []rdx.RDX) (id rdx.ID, err error)
- func (cho *Chotki) GetClassTLV(ctx context.Context, cid rdx.ID) ([]byte, error)
- func (cho *Chotki) GetFieldTLV(id rdx.ID) (rdt byte, tlv []byte)
- func (cho *Chotki) IncNField(ctx context.Context, fid rdx.ID) (id rdx.ID, err error)
- func (cho *Chotki) Last() rdx.ID
- func (cho *Chotki) Listen(addr string) error
- func (cho *Chotki) Logger() utils.Logger
- func (cho *Chotki) MapTRField(fid rdx.ID) (themap rdx.MapTR, err error)
- func (cho *Chotki) Metrics() []prometheus.Collector
- func (cho *Chotki) NewClass(ctx context.Context, parent rdx.ID, fields ...classes.Field) (id rdx.ID, err error)
- func (cho *Chotki) NewObjectTLV(ctx context.Context, tid rdx.ID, fields protocol.Records) (id rdx.ID, err error)
- func (cho *Chotki) ObjectFieldTLV(fid rdx.ID) (rdt byte, tlv []byte, err error)
- func (cho *Chotki) ObjectFields(oid rdx.ID) (tid rdx.ID, decl classes.Fields, fact protocol.Records, err error)
- func (cho *Chotki) ObjectIterator(oid rdx.ID, snap *pebble.Snapshot) *pebble.Iterator
- func (cho *Chotki) ObjectMapper() *ORM
- func (cho *Chotki) ObjectString(oid rdx.ID) (txt string, err error)
- func (cho *Chotki) RemoveAllHooks(fid rdx.ID)
- func (cho *Chotki) RemoveHook(fid rdx.ID, hook Hook) (err error)
- func (cho *Chotki) Snapshot() pebble.Reader
- func (cho *Chotki) Source() uint64
- func (cho *Chotki) Unlisten(addr string) error
- func (cho *Chotki) UpdateVTree(id, ref rdx.ID, pb *pebble.Batch) (err error)
- func (cho *Chotki) VersionVector() (vv rdx.VV, err error)
- func (cho *Chotki) WriteOptions() *pebble.WriteOptions
- type ChotkiCollector
- type Hook
- type Merger
- type NativeObject
- type NetCollector
- type ORM
- func (orm *ORM) Clear() error
- func (orm *ORM) Close() error
- func (orm *ORM) FindID(obj NativeObject) rdx.ID
- func (orm *ORM) GetIdByHash(cid rdx.ID, fid uint32, tlv []byte) (rdx.ID, error)
- func (orm *ORM) Load(id rdx.ID, blanc NativeObject, skipFields ...uint64) (obj NativeObject, err error)
- func (orm *ORM) New(ctx context.Context, cid rdx.ID, objs ...NativeObject) (err error)
- func (orm *ORM) Object(id rdx.ID) NativeObject
- func (orm *ORM) Save(ctx context.Context, objs ...NativeObject) (err error)
- func (orm *ORM) SaveAll(ctx context.Context) (err error)
- func (orm *ORM) SeekIds(cid rdx.ID) iter.Seq[rdx.ID]
- func (orm *ORM) SyncAll(ctx context.Context) (err error)
- func (orm *ORM) UpdateAll() (err error)
- func (orm *ORM) UpdateObject(obj NativeObject, snap *pebble.Snapshot) error
- type Options
- type PebbleCollector
- type PebbleMergeAdaptor
Constants ¶
const YAckOff = uint64(2)
Variables ¶
var ( ErrDbClosed = errors.New("chotki: db is closed") ErrDirnameIsFile = errors.New("chotki: the dirname is file") ErrNotImplemented = errors.New("chotki: not implemented yet") ErrHookNotFound = errors.New("chotki: hook not found") ErrBadIRecord = errors.New("chotki: bad id-ref record") ErrBadORecord = errors.New("chotki: bad id-ref record") ErrBadEPacket = errors.New("chotki: bad E packet") ErrBadVPacket = errors.New("chotki: bad V packet") ErrBadYPacket = errors.New("chotki: bad Y packet") ErrBadLPacket = errors.New("chotki: bad L packet") ErrBadTPacket = errors.New("chotki: bad T packet") ErrBadOPacket = errors.New("chotki: bad O packet") ErrSrcUnknown = errors.New("chotki: source unknown") ErrSyncUnknown = errors.New("chotki: sync session unknown") ErrBadRRecord = errors.New("chotki: bad ref record") ErrBadTypeDescription = errors.New("chotki: bad type description") ErrUnknownFieldInAType = errors.New("chotki: unknown field for the type") ErrBadClass = errors.New("chotki: bad class description") ErrOutOfOrder = errors.New("chotki: order fail: sequence gap") ErrCausalityBroken = errors.New("chotki: order fail: refs an unknown op") )
var ( IdNames = id2.ToOff(1) IdNodes = id2.ToOff(2) IdNodeInfo = id2.ToOff(3) // ID from which we count user static objects IdLog1 = id2.ToOff(4) )
var DiffSyncSize = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "chotki", Name: "diff_sync_size", Buckets: []float64{ 1, 10, 100, 1000, 5000, 10000, 50000, 100000, 500000, 1000000, 2000000, 5000000, 10000000, 20000000, 50000000, 100000000, 200000000, 500000000, 1000000000, 2000000000, 5000000000, }, })
Size of diff sync in bytes
var DrainTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "chotki", Name: "drain_time", Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 100, 500, 1000, 5000, 10000}, }, []string{"type"})
var ErrOffsetOpId = errors.New("op id is offset")
var ErrWrongFieldType = errors.New("wrong field type")
var EventsBatchSize = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "chotki", Name: "batch_size", Buckets: []float64{0, 1, 10, 50, 100, 500, 1000, 10000, 100000, 1000000}, })
var EventsMetric = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "chotki",
Name: "packet_count",
})
var EventsOutboundMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "chotki", Name: "outbound_packet_count", }, []string{"name"})
var Log0 = protocol.Records{ protocol.Record('Y', protocol.Record('I', rdx.ID0.ZipBytes()), protocol.Record('R', rdx.ID0.ZipBytes()), ), protocol.Record('C', protocol.Record('I', id1.ZipBytes()), protocol.Record('R', rdx.ID0.ZipBytes()), rdx.Atlv(rdx.LogT{ "M\x00Names", }), ), protocol.Record('O', protocol.Record('I', id2.ZipBytes()), protocol.Record('R', id1.ZipBytes()), protocol.Record('M', protocol.Record('T', rdx.Ttlv("0")), protocol.Record('R', rdx.Rtlv(rdx.ID0)), protocol.Record('T', rdx.Ttlv("Global")), protocol.Record('R', rdx.Rtlv(id2)), protocol.Record('T', rdx.Ttlv("Names")), protocol.Record('R', rdx.Rtlv(IdNames)), protocol.Record('T', rdx.Ttlv("Nodes")), protocol.Record('R', rdx.Rtlv(IdNodes)), ), ), protocol.Record('C', protocol.Record('I', id3.ZipBytes()), protocol.Record('R', rdx.ID0.ZipBytes()), rdx.Atlv(rdx.LogT{ "S\x00Name", "V\x00Ack", "S\x00Addr", }), ), }
Functions ¶
Types ¶
type Chotki ¶
type Chotki struct {
IndexManager *indexes.IndexManager
// contains filtered or unexported fields
}
Main Chotki struct
func (*Chotki) AddToNField ¶
func (cho *Chotki) AddToNField(ctx context.Context, fid rdx.ID, count uint64) (id rdx.ID, err error)
Increments only 'N' counters by arbitrary amount.
func (*Chotki) ApplyC ¶
func (cho *Chotki) ApplyC(id, ref rdx.ID, body []byte, batch *pebble.Batch, calls *[]CallHook) (err error)
Handles 'C' packets which can occur from applying local changes or live sync. Creates or updates class definition. Classes are special objects. They are stored in separate key range in pebble. They also have more simplified behaviour in create/update scenarios: they are just created/replaced as is. All classes are created with rid = rdx.ID0, if you pass other ref, it should be real ref of what you want to edit.
func (*Chotki) ApplyD ¶
During the diff sync it handles the 'D' packets which most of the time contains a single block (look at the replication protocol description). It does not immediately apply the changes to DB, instead using a batch. The batch will be applied when we finish the diffsync, when we receive the 'V' packet.
func (*Chotki) ApplyE ¶
func (cho *Chotki) ApplyE(id, r rdx.ID, body []byte, batch *pebble.Batch, calls *[]CallHook) (err error)
Handles 'E' packets which can occur from applying local changes or live sync. Edits obkject fields. Unlike ApplyOY, it does not assume that we update whole object, as we can update individual fields. It also sets the current replica src id for FIRST/MEL types. Otherwise its just merges bytes into the batch.
func (*Chotki) ApplyH ¶
During the diff sync it handles the 'H' packets which contains the version vector of other replica. So we put their version vector into the batch to update ours after diff sync.
func (*Chotki) ApplyOY ¶
Handles 'O' packets which can occur from applying local changes or live sync. Creates objects 'O'. 'Y' are special kind of objects, unused at the moment. Typically expects an 'O' package: 0 — 'O' record that contains class rdx.ID 1 - ... - class fields First it creates 'O' field. Then it goes through the rest of the fields encoded as TLV. The only transformation it does: it sets the current replica src id for FIRST/MEL types, because historically they are not set when creating those fields (for convinience?)
func (*Chotki) ApplyV ¶
During the diff sync it handles the 'V' packets. This packet effectively completes the diffy sync. It contains the version vector of the blocks we synced during the diff sync. We put then in the batch to update our blocks version vectors.
func (*Chotki) Broadcast ¶
Broadcasts some records to all active replication sessions that this replica has, except one.
func (*Chotki) ClassFields ¶
Returns class fields and caches them. If class is changed cache will be invalidated.
func (*Chotki) CommitPacket ¶
func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body protocol.Records) (id rdx.ID, err error)
Commits records to actual storage (pebble) and broadcasts the update to all active replication sessions. Increments replica last rdx.ID and stamps this update with it. This id will be used as an ID of a new object (if it is an object), also this will be the latest seen rdx.ID in the version vector. Uses an exclusive lock, so all commits are serialized, but remember that Drain/drain calls are not. All replication sessions will call Drain in parallel.
func (*Chotki) ConnectPool ¶
Connects to the given address pool.
func (*Chotki) Counter ¶
func (cho *Chotki) Counter(rid rdx.ID, offset uint64, updatePeriod time.Duration) *counters.AtomicCounter
Returns an atomic counter object that can be used to increment/decrement a counter.
func (*Chotki) Disconnect ¶
Disconnects from the given address.
func (*Chotki) DumpObjects ¶
Dumps all objects to the writer.
func (*Chotki) EditFieldTLV ¶
func (cho *Chotki) EditFieldTLV(ctx context.Context, fid rdx.ID, delta []byte) (id rdx.ID, err error)
Edits the object field using TLV-encoded value.
func (*Chotki) EditObjectRDX ¶
func (cho *Chotki) EditObjectRDX(ctx context.Context, oid rdx.ID, pairs []rdx.RDX) (id rdx.ID, err error)
Edits the object fields using string RDX representation.
func (*Chotki) GetClassTLV ¶
Returns the TLV-encoded class definition. Only used in REPL.
func (*Chotki) GetFieldTLV ¶
Returns the TLV-encoded value of the object field, given object rdx.ID with offset.
func (*Chotki) MapTRField ¶
Extracts TLV value of the field that is supposed to be a map and converts it to golang map.
func (*Chotki) Metrics ¶
func (cho *Chotki) Metrics() []prometheus.Collector
Returns a list of prometheus collectors for the Chotki instance.
func (*Chotki) NewClass ¶
func (cho *Chotki) NewClass(ctx context.Context, parent rdx.ID, fields ...classes.Field) (id rdx.ID, err error)
Creates or updates class definition. Remember that you can only add fields, not remove them.
func (*Chotki) NewObjectTLV ¶
func (cho *Chotki) NewObjectTLV(ctx context.Context, tid rdx.ID, fields protocol.Records) (id rdx.ID, err error)
Thin wrapper around CommitPacket with 'O' type
func (*Chotki) ObjectFieldTLV ¶
Returns the TLV-encoded value of the object field, given object rdx.ID with offset.
func (*Chotki) ObjectFields ¶
func (cho *Chotki) ObjectFields(oid rdx.ID) (tid rdx.ID, decl classes.Fields, fact protocol.Records, err error)
Given object id, returns class definition and actual object fields values
func (*Chotki) ObjectIterator ¶
Creates and iterator that only walks through the specific object. Will return nil if the object does not exist.
func (*Chotki) ObjectMapper ¶
Returns the new instance of the ORM object
func (*Chotki) ObjectString ¶
Returns the string representation of the object.
func (*Chotki) RemoveAllHooks ¶
func (*Chotki) UpdateVTree ¶
Updates the Vkey0, which stores version vectors for different replicas. But also updates the corresponding block (check replication protocol description) version vector. Version vector is a map: replica src id -> latest seen rdx.ID
func (*Chotki) VersionVector ¶
Returns the version vector of the Chotki instance as rdx.VV structure.
func (*Chotki) WriteOptions ¶
func (cho *Chotki) WriteOptions() *pebble.WriteOptions
Returns the write options of the Chotki instance.
type ChotkiCollector ¶
type ChotkiCollector struct {
// contains filtered or unexported fields
}
func NewChotkiCollector ¶
func NewChotkiCollector(chotki *Chotki) *ChotkiCollector
func (*ChotkiCollector) Collect ¶
func (n *ChotkiCollector) Collect(m chan<- prometheus.Metric)
func (*ChotkiCollector) Describe ¶
func (c *ChotkiCollector) Describe(d chan<- *prometheus.Desc)
type NativeObject ¶
type NetCollector ¶
type NetCollector struct {
// contains filtered or unexported fields
}
func NewNetCollector ¶
func NewNetCollector(net *network.Net) *NetCollector
func (*NetCollector) Collect ¶
func (n *NetCollector) Collect(m chan<- prometheus.Metric)
func (*NetCollector) Describe ¶
func (n *NetCollector) Describe(d chan<- *prometheus.Desc)
type ORM ¶
func (*ORM) FindID ¶
func (orm *ORM) FindID(obj NativeObject) rdx.ID
Find the ID of the registered object's.
func (*ORM) GetIdByHash ¶
Use hash index to find an object rdx.ID by its hash.
func (*ORM) Load ¶
func (orm *ORM) Load(id rdx.ID, blanc NativeObject, skipFields ...uint64) (obj NativeObject, err error)
Load the object's state from the db, register the object. If an object is already registered for that id, returns the old one. The new one is not used then.
func (*ORM) Object ¶
func (orm *ORM) Object(id rdx.ID) NativeObject
Find a registered object given its id. nil if none.
func (*ORM) Save ¶
func (orm *ORM) Save(ctx context.Context, objs ...NativeObject) (err error)
Save the registered object's changes. Much faster than SaveALl() esp if you loaded many, modified few.
func (*ORM) SaveAll ¶
SaveAll the changed fields; this will scan the objects and their database records.
func (*ORM) UpdateObject ¶
func (orm *ORM) UpdateObject(obj NativeObject, snap *pebble.Snapshot) error
type Options ¶
type Options struct {
pebble.Options
Src uint64
Name string
Log1 protocol.Records
Logger utils.Logger
PingPeriod time.Duration // how often should we ping neighbour replicae if its silent
PingWait time.Duration // how much time we wait until pong received
PebbleWriteOptions *pebble.WriteOptions
BroadcastQueueMaxSize int // size in bytes, after reaching it all writes will block
BroadcastQueueMinBatchSize int // reads will wait until they have enough data or timelimit expires
// if this limit expires before read has enough data (BroadcastQueueMinBatchSize) it will return whatever it has,
// writes will cause overflow error which will result in queue shutdown and session end
BroadcastQueueTimeLimit time.Duration
ReadAccumTimeLimit time.Duration //
ReadMaxBufferSize int
ReadMinBufferSizeToProcess int
TcpReadBufferSize int
TcpWriteBufferSize int
WriteTimeout time.Duration
TlsConfig *tls.Config
MaxSyncDuration time.Duration
}
func (*Options) SetDefaults ¶
func (o *Options) SetDefaults()
type PebbleCollector ¶
type PebbleCollector struct {
// contains filtered or unexported fields
}
func NewPebbleCollector ¶
func NewPebbleCollector(db *pebble.DB) *PebbleCollector
func (*PebbleCollector) Collect ¶
func (pc *PebbleCollector) Collect(ch chan<- prometheus.Metric)
func (*PebbleCollector) Describe ¶
func (pc *PebbleCollector) Describe(ch chan<- *prometheus.Desc)
type PebbleMergeAdaptor ¶
type PebbleMergeAdaptor struct {
// contains filtered or unexported fields
}
func (*PebbleMergeAdaptor) MergeNewer ¶
func (a *PebbleMergeAdaptor) MergeNewer(value []byte) error
func (*PebbleMergeAdaptor) MergeOlder ¶
func (a *PebbleMergeAdaptor) MergeOlder(value []byte) error
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Provides commone chotki errors definitions.
|
Provides commone chotki errors definitions. |
|
Defines Host interfaces for Chotki
|
Defines Host interfaces for Chotki |
|
Package indexes provides the index management subsystem for Chotki.
|
Package indexes provides the index management subsystem for Chotki. |
|
Package network provides high-performance TCP/TLS transport for the Chotki protocol.
|
Package network provides high-performance TCP/TLS transport for the Chotki protocol. |
|
Implements a compact TLV (Type-Length-Value) encoding format optimized for efficiency.
|
Implements a compact TLV (Type-Length-Value) encoding format optimized for efficiency. |
|
Implements the Chotki distributed synchronization protocol.
|
Implements the Chotki distributed synchronization protocol. |