Documentation
¶
Overview ¶
Package journal implements WAL-like append-only journals. A journal is split into segments; the last segment is the one being written to.
Intended use cases:
- Database WAL files.
- Log files of various kinds.
- Archival of historical database records.
Features:
Suitable for a large number of very short records. Per-record overhead can be as low as 2 bytes.
Suitable for very large records, too. (In the future, it will be possible to write records in chunks.)
Fault-resistant.
Self-healing. Verifies the checksums and truncates corrupted data when opening the journal.
Performant.
Automatically rotates the files when they reach a certain size.
TODO:
Trigger rotation based on time (say, each day gets a new segment). Basically limit how old in-progress segments can be.
Allow to rotate a file without writing a new record. (Otherwise rarely-used journals will never get archived.)
Give work-in-progress file a prefixed name (W*).
Auto-commit every N seconds, after K bytes, after M records.
Option for millisecond timestamp precision?
Reading API. (Search based on time and record ordinals.)
File format ¶
Segment files:
- file = segmentHeader item*
- segmentHeader = (see struct)
- item = record | commit
- record = (size << 1):uvarint timestampDelta:uvarint bytes*
- commit = checksum_with_bit_0_set:64
We always set bit 0 of commit checksums, and we use size*2 when encoding records; so bit 0 of the first byte of an item indicates whether it's a record or a commit.
Timestamps are 32-bit unix times and have 1 second precision. (Rationale is that the primary use of timestamps is to search logs by time, and that does not require a higher precision. For high-frequency logs, with 1-second precision, timestamp deltas will typically fit within 1 byte.)
Index ¶
- Constants
- Variables
- func MergedRecords(journals map[uint64]*Journal, filter Filter, fail func(error)) iter.Seq[RecordWithSource]
- func ParseTime(s string) (uint64, error)
- func TimeToStr(t time.Time) string
- func ToTime(ts uint64) time.Time
- func ToTimestamp(t time.Time) uint64
- type AutocommitOptions
- type AutorotateOptions
- type Cursor
- type Filter
- type Journal
- func (j *Journal) Autocommit(now uint64) (bool, error)
- func (j *Journal) Autorotate(now uint64) (bool, error)
- func (j *Journal) CanSeal() bool
- func (j *Journal) Commit() error
- func (j *Journal) FindSegments(filter Filter) ([]Segment, error)
- func (j *Journal) FinishWriting() error
- func (j *Journal) Initialize() error
- func (j *Journal) Now() uint64
- func (j *Journal) QuickSummary() (Summary, error)
- func (j *Journal) Read(filter Filter) *Cursor
- func (j *Journal) Records(filter Filter, fail func(error)) iter.Seq[Record]
- func (j *Journal) Rotate() error
- func (j *Journal) Seal(ctx context.Context) (Segment, error)
- func (j *Journal) SealAndTrimAll(ctx context.Context) (int, error)
- func (j *Journal) SealAndTrimOnce(ctx context.Context) (int, error)
- func (j *Journal) StartWriting()
- func (j *Journal) String() string
- func (j *Journal) Summary() (Summary, error)
- func (j *Journal) Trim() (Segment, error)
- func (j *Journal) WriteRecord(timestamp uint64, data []byte) error
- type Meta
- type Options
- type Record
- type RecordWithSource
- type Segment
- type Set
- func (set *Set) Add(j *Journal)
- func (set *Set) Autocommit(ctx context.Context) int
- func (set *Set) Autoseal(ctx context.Context) int
- func (set *Set) Journals() []*Journal
- func (set *Set) Process(ctx context.Context) int
- func (set *Set) Remove(j *Journal)
- func (set *Set) StartBackground(ctx context.Context) *SetRunner
- type SetOptions
- type SetRunner
- type Status
- type Summary
Constants ¶
const DefaultMaxFileSize = 10 * 1024 * 1024
Variables ¶
var ( ErrIncompatible = fmt.Errorf("incompatible journal") ErrUnsupportedVersion = fmt.Errorf("unsupported journal version") )
var ErrInternal = errors.New("journal internal error")
var ErrInvalidTimestamp = errors.New("invalid timestamp")
var ErrMissingSealKey = errors.New("missing seal key")
Functions ¶
func MergedRecords ¶ added in v0.0.9
func MergedRecords(journals map[uint64]*Journal, filter Filter, fail func(error)) iter.Seq[RecordWithSource]
MergedRecords returns an iterator that merges records from multiple journals, sorted by timestamp, then by source (for stability), then by record ID.
func ToTimestamp ¶
Types ¶
type AutocommitOptions ¶ added in v0.0.2
type AutorotateOptions ¶
type Journal ¶
type Journal struct {
// contains filtered or unexported fields
}
func (*Journal) FinishWriting ¶
func (*Journal) Initialize ¶
func (*Journal) QuickSummary ¶ added in v0.0.3
QuickSummary returns the general information about the journal, without actually reading the journal data. So if the journal hasn't been opened yet, last committed record information will not be available.
func (*Journal) SealAndTrimAll ¶ added in v0.0.6
func (*Journal) SealAndTrimOnce ¶ added in v0.0.6
func (*Journal) StartWriting ¶
func (j *Journal) StartWriting()
type Options ¶
type Options struct {
FileName string // e.g. "mydb-*.bin"
MaxFileSize int64 // new segment after this size
DebugName string
Now func() time.Time
JournalInvariant [32]byte
SegmentInvariant [32]byte
Autorotate AutorotateOptions
Autocommit AutocommitOptions
TrashPath string // optional; defaults to <dir>/trash
Context context.Context
Logger *slog.Logger
Verbose bool
VeryVerbose bool
OnChange func()
SealKeys []*sealer.Key
SealOpts sealer.SealOptions
}
type RecordWithSource ¶ added in v0.0.9
type Segment ¶
type Segment struct {
// contains filtered or unexported fields
}
func (Segment) FirstRecord ¶ added in v0.0.3
func (Segment) RecordNumber ¶ added in v0.0.3
func (Segment) SegmentNumber ¶ added in v0.0.3
type Set ¶
type Set struct {
// contains filtered or unexported fields
}
func NewSet ¶
func NewSet(opt SetOptions) *Set
type SetOptions ¶
type Summary ¶ added in v0.0.3
type Summary struct {
FirstSealedSegment Segment
LastSealedSegment Segment
FirstUnsealedSegment Segment
LastUnsealedSegment Segment
SegmentCount int
LastCommitted Meta
LastUncommitted Meta
}