connclickhouse

package
v0.0.0-...-6f847f4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2026 License: AGPL-3.0 Imports: 39 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var CHSettingMinVersions = map[CHSetting]chproto.Version{
	SettingJsonTypeEscapeDotsInKeys:    {Major: 25, Minor: 8, Patch: 0},
	SettingTypeJsonSkipDuplicatedPaths: {Major: 24, Minor: 8, Patch: 0},
	SettingMaxTableSizeToDrop:          {Major: 23, Minor: 12, Patch: 0},
}

CHSettingMinVersions maps setting names to their minimum required ClickHouse versions If minimum version is not specified, we assume the setting is available to all ClickHouse versions

View Source
var NumericDestinationTypes = map[string]struct{}{
	"String":  {},
	"Int256":  {},
	"UInt256": {},
}

This file handles the mapping for ClickHouse destination types and their corresponding TypeConversion implementations. A TypeConversion object contains two functions: one for schema conversion (QField) and one for value conversion (QValue). This allows the avro writer to stage the schema/data in the converted type format, and therefore successfully uploaded to the desired destination type in ClickHouse.

To add a type conversion:

(1) In flow/model/shared/type_converter.go:
- implement a SchemaConversionFn interface to convert the QField type
- implement a ValueConversionFn interface to convert the QValue data

(2) Add the new conversion to the `supportedDestinationTypes` map here
	(if destination type doesn't exist, create a new map entry for it).

Functions

func Connect

func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (clickhouse.Conn, error)

func GetAvroStage

func GetAvroStage(ctx context.Context, flowJobName string, syncBatchID int64) (utils.AvroFile, error)

func GetColumnsTypeConversion

func GetColumnsTypeConversion() (*protos.ColumnsTypeConversionResponse, error)

func GetMinVersion

func GetMinVersion(name CHSetting) (chproto.Version, bool)

func GetTableSchemaForTable

func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType) (*protos.TableSchema, error)

func ListSupportedTypeConversions

func ListSupportedTypeConversions() map[types.QValueKind][]string

returns the full list of supported type conversions. The keys are QValueKind to allows the implementation to be source-connector agnostic.

func NewCHSettingsString

func NewCHSettingsString(version *chproto.Version, key CHSetting, val string) string

NewCHSettingsString is a one-liner method to generate an immutable settings string

func SetAvroStage

func SetAvroStage(
	ctx context.Context,
	flowJobName string,
	syncBatchID int64,
	avroFile utils.AvroFile,
) error

func ValidateClickHouseHost

func ValidateClickHouseHost(ctx context.Context, chHost string, allowedDomainString string) error

func ValidateS3

func ValidateS3(ctx context.Context, creds *utils.ClickHouseS3Credentials) error

Types

type CHSetting

type CHSetting string
const (
	SettingAllowNullableKey                   CHSetting = "allow_nullable_key"
	SettingJsonTypeEscapeDotsInKeys           CHSetting = "json_type_escape_dots_in_keys"
	SettingTypeJsonSkipDuplicatedPaths        CHSetting = "type_json_skip_duplicated_paths"
	SettingThrowOnMaxPartitionsPerInsertBlock CHSetting = "throw_on_max_partitions_per_insert_block"
	SettingParallelDistributedInsertSelect    CHSetting = "parallel_distributed_insert_select"
	SettingMaxTableSizeToDrop                 CHSetting = "max_table_size_to_drop"
)

When adding a new clickhouse setting to this list, check when the setting is introduced to ClickHouse and if applicable, add a corresponding minimum supported version below to ensure queries on older versions of ClickHouse servers are not impacted. Important: if the setting causes breaking changes to existing PeerDB flows (not just ClickHouse compatibility), it must also be gated by PeerDB's internal version.

type CHSettingEntry

type CHSettingEntry struct {
	// contains filtered or unexported fields
}

type CHSettings

type CHSettings struct {
	// contains filtered or unexported fields
}

func NewCHSettings

func NewCHSettings(version *chproto.Version, settings ...CHSettingEntry) *CHSettings

func (*CHSettings) Add

func (sg *CHSettings) Add(key CHSetting, val string) *CHSettings

func (*CHSettings) String

func (sg *CHSettings) String() string

String generates settings string ' SETTINGS <key1> = <val1>, <key2> = <val2>, ...'; If ClickHouse version is set in the CHSettings, settings that do not meet CH version requirement will be filtered out. Otherwise, all settings are included.

type ClickHouseAvroSyncMethod

type ClickHouseAvroSyncMethod struct {
	*ClickHouseConnector
	// contains filtered or unexported fields
}

func NewClickHouseAvroSyncMethod

func NewClickHouseAvroSyncMethod(
	config *protos.QRepConfig,
	connector *ClickHouseConnector,
) *ClickHouseAvroSyncMethod

func (*ClickHouseAvroSyncMethod) CopyStageToDestination

func (s *ClickHouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, avroFile utils.AvroFile) error

func (*ClickHouseAvroSyncMethod) SyncQRepObjects

func (s *ClickHouseAvroSyncMethod) SyncQRepObjects(
	ctx context.Context,
	config *protos.QRepConfig,
	partition *protos.QRepPartition,
	stream *model.QObjectStream,
) (int64, shared.QRepWarnings, error)

func (*ClickHouseAvroSyncMethod) SyncQRepRecords

func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
	ctx context.Context,
	config *protos.QRepConfig,
	partition *protos.QRepPartition,
	stream *model.QRecordStream,
) (int64, shared.QRepWarnings, error)

func (*ClickHouseAvroSyncMethod) SyncRecords

func (s *ClickHouseAvroSyncMethod) SyncRecords(
	ctx context.Context,
	env map[string]string,
	stream *model.QRecordStream,
	flowJobName string,
	syncBatchID int64,
) (int64, error)

type ClickHouseConnector

type ClickHouseConnector struct {
	*metadataStore.PostgresMetadata

	Config *protos.ClickhouseConfig
	// contains filtered or unexported fields
}

func NewClickHouseConnector

func NewClickHouseConnector(
	ctx context.Context,
	env map[string]string,
	config *protos.ClickhouseConfig,
) (*ClickHouseConnector, error)

func (*ClickHouseConnector) CleanupQRepFlow

func (c *ClickHouseConnector) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error

CleanupQRepFlow function for clickhouse connector

func (*ClickHouseConnector) CleanupSetupNormalizedTables

func (c *ClickHouseConnector) CleanupSetupNormalizedTables(_ context.Context, _ any)

func (*ClickHouseConnector) Close

func (c *ClickHouseConnector) Close() error

func (*ClickHouseConnector) ConnectionActive

func (c *ClickHouseConnector) ConnectionActive(ctx context.Context) error

func (*ClickHouseConnector) ConsolidateQRepPartitions

func (c *ClickHouseConnector) ConsolidateQRepPartitions(_ context.Context, config *protos.QRepConfig) error

We need to implement QRepConsolidateConnector interface so CleanQRepFlow is called Otherwise we could have skipped this

func (*ClickHouseConnector) CreateRawTable

func (*ClickHouseConnector) FinishSetupNormalizedTables

func (c *ClickHouseConnector) FinishSetupNormalizedTables(_ context.Context, _ any) error

func (*ClickHouseConnector) GetRawTableName

func (c *ClickHouseConnector) GetRawTableName(flowJobName string) string

GetRawTableName returns the raw table name for the given table identifier.

func (*ClickHouseConnector) GetTableSchema

func (c *ClickHouseConnector) GetTableSchema(
	ctx context.Context,
	_env map[string]string,
	_version uint32,
	_system protos.TypeSystem,
	tableMappings []*protos.TableMapping,
) (map[string]*protos.TableSchema, error)

func (*ClickHouseConnector) GetVersion

func (c *ClickHouseConnector) GetVersion(ctx context.Context) (string, error)

func (*ClickHouseConnector) NormalizeRecords

func (*ClickHouseConnector) RemoveTableEntriesFromRawTable

func (c *ClickHouseConnector) RemoveTableEntriesFromRawTable(
	ctx context.Context,
	req *protos.RemoveTablesFromRawTableInput,
) error

func (*ClickHouseConnector) RenameTables

func (*ClickHouseConnector) ReplayTableSchemaDeltas

func (c *ClickHouseConnector) ReplayTableSchemaDeltas(
	ctx context.Context,
	env map[string]string,
	flowJobName string,
	tableMappings []*protos.TableMapping,
	schemaDeltas []*protos.TableSchemaDelta,
) error

func (*ClickHouseConnector) SetupNormalizedTable

func (c *ClickHouseConnector) SetupNormalizedTable(
	ctx context.Context,
	tx any,
	config *protos.SetupNormalizedTableBatchInput,
	destinationTableIdentifier string,
	sourceTableSchema *protos.TableSchema,
) (bool, error)

func (*ClickHouseConnector) SetupQRepMetadataTables

func (*ClickHouseConnector) SetupQRepMetadataTables(_ context.Context, _ *protos.QRepConfig) error

func (*ClickHouseConnector) StartSetupNormalizedTables

func (c *ClickHouseConnector) StartSetupNormalizedTables(_ context.Context) (any, error)

func (*ClickHouseConnector) SyncFlowCleanup

func (c *ClickHouseConnector) SyncFlowCleanup(ctx context.Context, jobName string) error

func (*ClickHouseConnector) SyncQRepObjects

func (c *ClickHouseConnector) SyncQRepObjects(
	ctx context.Context,
	config *protos.QRepConfig,
	partition *protos.QRepPartition,
	stream *model.QObjectStream,
) (int64, shared.QRepWarnings, error)

SyncQRepObjects syncs data from downloadable objects (URLs) to ClickHouse Supports all ClickHouse formats based on the stream's Format field Objects are batched by size limits to reduce INSERT operations

func (*ClickHouseConnector) SyncQRepRecords

func (c *ClickHouseConnector) SyncQRepRecords(
	ctx context.Context,
	config *protos.QRepConfig,
	partition *protos.QRepPartition,
	stream *model.QRecordStream,
) (int64, shared.QRepWarnings, error)

func (*ClickHouseConnector) SyncRecords

func (*ClickHouseConnector) ValidateCheck

func (c *ClickHouseConnector) ValidateCheck(ctx context.Context) error

Performs some checks on the ClickHouse peer to ensure it will work for mirrors

func (*ClickHouseConnector) ValidateMirrorDestination

func (c *ClickHouseConnector) ValidateMirrorDestination(
	ctx context.Context,
	cfg *protos.FlowConnectionConfigsCore,
	tableNameSchemaMapping map[string]*protos.TableSchema,
) error

type NormalizeQueryGenerator

type NormalizeQueryGenerator struct {
	Query     string
	TableName string
	// contains filtered or unexported fields
}

func NewNormalizeQueryGenerator

func NewNormalizeQueryGenerator(
	tableName string,
	tableNameSchemaMapping map[string]*protos.TableSchema,
	tableMappings []*protos.TableMapping,
	endBatchID int64,
	lastNormBatchID int64,
	enablePrimaryUpdate bool,
	sourceSchemaAsDestinationColumn bool,
	env map[string]string,
	rawTableName string,
	chVersion *chproto.Version,
	cluster bool,
	configuredSoftDeleteColName string,
	version uint32,
) *NormalizeQueryGenerator

NewTableNormalizeQuery constructs a TableNormalizeQuery with required fields.

func (*NormalizeQueryGenerator) BuildQuery

func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL