Documentation
¶
Index ¶
- Variables
- func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (clickhouse.Conn, error)
- func GetAvroStage(ctx context.Context, flowJobName string, syncBatchID int64) (utils.AvroFile, error)
- func GetColumnsTypeConversion() (*protos.ColumnsTypeConversionResponse, error)
- func GetMinVersion(name CHSetting) (chproto.Version, bool)
- func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType) (*protos.TableSchema, error)
- func ListSupportedTypeConversions() map[types.QValueKind][]string
- func NewCHSettingsString(version *chproto.Version, key CHSetting, val string) string
- func SetAvroStage(ctx context.Context, flowJobName string, syncBatchID int64, ...) error
- func ValidateClickHouseHost(ctx context.Context, chHost string, allowedDomainString string) error
- func ValidateS3(ctx context.Context, creds *utils.ClickHouseS3Credentials) error
- type CHSetting
- type CHSettingEntry
- type CHSettings
- type ClickHouseAvroSyncMethod
- func (s *ClickHouseAvroSyncMethod) CopyStageToDestination(ctx context.Context, avroFile utils.AvroFile) error
- func (s *ClickHouseAvroSyncMethod) SyncQRepObjects(ctx context.Context, config *protos.QRepConfig, ...) (int64, shared.QRepWarnings, error)
- func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(ctx context.Context, config *protos.QRepConfig, ...) (int64, shared.QRepWarnings, error)
- func (s *ClickHouseAvroSyncMethod) SyncRecords(ctx context.Context, env map[string]string, stream *model.QRecordStream, ...) (int64, error)
- type ClickHouseConnector
- func (c *ClickHouseConnector) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error
- func (c *ClickHouseConnector) CleanupSetupNormalizedTables(_ context.Context, _ any)
- func (c *ClickHouseConnector) Close() error
- func (c *ClickHouseConnector) ConnectionActive(ctx context.Context) error
- func (c *ClickHouseConnector) ConsolidateQRepPartitions(_ context.Context, config *protos.QRepConfig) error
- func (c *ClickHouseConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error)
- func (c *ClickHouseConnector) FinishSetupNormalizedTables(_ context.Context, _ any) error
- func (c *ClickHouseConnector) GetRawTableName(flowJobName string) string
- func (c *ClickHouseConnector) GetTableSchema(ctx context.Context, _env map[string]string, _version uint32, ...) (map[string]*protos.TableSchema, error)
- func (c *ClickHouseConnector) GetVersion(ctx context.Context) (string, error)
- func (c *ClickHouseConnector) NormalizeRecords(ctx context.Context, req *model.NormalizeRecordsRequest) (model.NormalizeResponse, error)
- func (c *ClickHouseConnector) RemoveTableEntriesFromRawTable(ctx context.Context, req *protos.RemoveTablesFromRawTableInput) error
- func (c *ClickHouseConnector) RenameTables(ctx context.Context, req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error)
- func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, env map[string]string, flowJobName string, ...) error
- func (c *ClickHouseConnector) SetupNormalizedTable(ctx context.Context, tx any, config *protos.SetupNormalizedTableBatchInput, ...) (bool, error)
- func (*ClickHouseConnector) SetupQRepMetadataTables(_ context.Context, _ *protos.QRepConfig) error
- func (c *ClickHouseConnector) StartSetupNormalizedTables(_ context.Context) (any, error)
- func (c *ClickHouseConnector) SyncFlowCleanup(ctx context.Context, jobName string) error
- func (c *ClickHouseConnector) SyncQRepObjects(ctx context.Context, config *protos.QRepConfig, ...) (int64, shared.QRepWarnings, error)
- func (c *ClickHouseConnector) SyncQRepRecords(ctx context.Context, config *protos.QRepConfig, ...) (int64, shared.QRepWarnings, error)
- func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error)
- func (c *ClickHouseConnector) ValidateCheck(ctx context.Context) error
- func (c *ClickHouseConnector) ValidateMirrorDestination(ctx context.Context, cfg *protos.FlowConnectionConfigsCore, ...) error
- type NormalizeQueryGenerator
Constants ¶
This section is empty.
Variables ¶
var CHSettingMinVersions = map[CHSetting]chproto.Version{ SettingJsonTypeEscapeDotsInKeys: {Major: 25, Minor: 8, Patch: 0}, SettingTypeJsonSkipDuplicatedPaths: {Major: 24, Minor: 8, Patch: 0}, SettingMaxTableSizeToDrop: {Major: 23, Minor: 12, Patch: 0}, }
CHSettingMinVersions maps setting names to their minimum required ClickHouse versions If minimum version is not specified, we assume the setting is available to all ClickHouse versions
var NumericDestinationTypes = map[string]struct{}{
"String": {},
"Int256": {},
"UInt256": {},
}
var SupportedDestinationTypes = map[string][]types.TypeConversion{ "String": { types.NewTypeConversion( types.NumericToStringSchemaConversion, types.NumericToStringValueConversion, ), }, "Int256": { types.NewTypeConversion( types.NumericToInt256SchemaConversion, types.NumericToInt256ValueConversion, ), }, "UInt256": { types.NewTypeConversion( types.NumericToUInt256SchemaConversion, types.NumericToUInt256ValueConversion, ), }, }
This file handles the mapping for ClickHouse destination types and their corresponding TypeConversion implementations. A TypeConversion object contains two functions: one for schema conversion (QField) and one for value conversion (QValue). This allows the avro writer to stage the schema/data in the converted type format, and therefore successfully uploaded to the desired destination type in ClickHouse.
To add a type conversion:
(1) In flow/model/shared/type_converter.go: - implement a SchemaConversionFn interface to convert the QField type - implement a ValueConversionFn interface to convert the QValue data (2) Add the new conversion to the `supportedDestinationTypes` map here (if destination type doesn't exist, create a new map entry for it).
Functions ¶
func Connect ¶
func Connect(ctx context.Context, env map[string]string, config *protos.ClickhouseConfig) (clickhouse.Conn, error)
func GetAvroStage ¶
func GetColumnsTypeConversion ¶
func GetColumnsTypeConversion() (*protos.ColumnsTypeConversionResponse, error)
func GetTableSchemaForTable ¶
func GetTableSchemaForTable(tm *protos.TableMapping, columns []driver.ColumnType) (*protos.TableSchema, error)
func ListSupportedTypeConversions ¶
func ListSupportedTypeConversions() map[types.QValueKind][]string
returns the full list of supported type conversions. The keys are QValueKind to allows the implementation to be source-connector agnostic.
func NewCHSettingsString ¶
NewCHSettingsString is a one-liner method to generate an immutable settings string
func SetAvroStage ¶
func ValidateClickHouseHost ¶
func ValidateS3 ¶
func ValidateS3(ctx context.Context, creds *utils.ClickHouseS3Credentials) error
Types ¶
type CHSetting ¶
type CHSetting string
const ( SettingAllowNullableKey CHSetting = "allow_nullable_key" SettingJsonTypeEscapeDotsInKeys CHSetting = "json_type_escape_dots_in_keys" SettingTypeJsonSkipDuplicatedPaths CHSetting = "type_json_skip_duplicated_paths" SettingThrowOnMaxPartitionsPerInsertBlock CHSetting = "throw_on_max_partitions_per_insert_block" SettingParallelDistributedInsertSelect CHSetting = "parallel_distributed_insert_select" SettingMaxTableSizeToDrop CHSetting = "max_table_size_to_drop" )
When adding a new clickhouse setting to this list, check when the setting is introduced to ClickHouse and if applicable, add a corresponding minimum supported version below to ensure queries on older versions of ClickHouse servers are not impacted. Important: if the setting causes breaking changes to existing PeerDB flows (not just ClickHouse compatibility), it must also be gated by PeerDB's internal version.
type CHSettingEntry ¶
type CHSettingEntry struct {
// contains filtered or unexported fields
}
type CHSettings ¶
type CHSettings struct {
// contains filtered or unexported fields
}
func NewCHSettings ¶
func NewCHSettings(version *chproto.Version, settings ...CHSettingEntry) *CHSettings
func (*CHSettings) Add ¶
func (sg *CHSettings) Add(key CHSetting, val string) *CHSettings
func (*CHSettings) String ¶
func (sg *CHSettings) String() string
String generates settings string ' SETTINGS <key1> = <val1>, <key2> = <val2>, ...'; If ClickHouse version is set in the CHSettings, settings that do not meet CH version requirement will be filtered out. Otherwise, all settings are included.
type ClickHouseAvroSyncMethod ¶
type ClickHouseAvroSyncMethod struct {
*ClickHouseConnector
// contains filtered or unexported fields
}
func NewClickHouseAvroSyncMethod ¶
func NewClickHouseAvroSyncMethod( config *protos.QRepConfig, connector *ClickHouseConnector, ) *ClickHouseAvroSyncMethod
func (*ClickHouseAvroSyncMethod) CopyStageToDestination ¶
func (*ClickHouseAvroSyncMethod) SyncQRepObjects ¶
func (s *ClickHouseAvroSyncMethod) SyncQRepObjects( ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition, stream *model.QObjectStream, ) (int64, shared.QRepWarnings, error)
func (*ClickHouseAvroSyncMethod) SyncQRepRecords ¶
func (s *ClickHouseAvroSyncMethod) SyncQRepRecords( ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition, stream *model.QRecordStream, ) (int64, shared.QRepWarnings, error)
func (*ClickHouseAvroSyncMethod) SyncRecords ¶
type ClickHouseConnector ¶
type ClickHouseConnector struct {
*metadataStore.PostgresMetadata
Config *protos.ClickhouseConfig
// contains filtered or unexported fields
}
func NewClickHouseConnector ¶
func NewClickHouseConnector( ctx context.Context, env map[string]string, config *protos.ClickhouseConfig, ) (*ClickHouseConnector, error)
func (*ClickHouseConnector) CleanupQRepFlow ¶
func (c *ClickHouseConnector) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error
CleanupQRepFlow function for clickhouse connector
func (*ClickHouseConnector) CleanupSetupNormalizedTables ¶
func (c *ClickHouseConnector) CleanupSetupNormalizedTables(_ context.Context, _ any)
func (*ClickHouseConnector) Close ¶
func (c *ClickHouseConnector) Close() error
func (*ClickHouseConnector) ConnectionActive ¶
func (c *ClickHouseConnector) ConnectionActive(ctx context.Context) error
func (*ClickHouseConnector) ConsolidateQRepPartitions ¶
func (c *ClickHouseConnector) ConsolidateQRepPartitions(_ context.Context, config *protos.QRepConfig) error
We need to implement QRepConsolidateConnector interface so CleanQRepFlow is called Otherwise we could have skipped this
func (*ClickHouseConnector) CreateRawTable ¶
func (c *ClickHouseConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error)
func (*ClickHouseConnector) FinishSetupNormalizedTables ¶
func (c *ClickHouseConnector) FinishSetupNormalizedTables(_ context.Context, _ any) error
func (*ClickHouseConnector) GetRawTableName ¶
func (c *ClickHouseConnector) GetRawTableName(flowJobName string) string
GetRawTableName returns the raw table name for the given table identifier.
func (*ClickHouseConnector) GetTableSchema ¶
func (c *ClickHouseConnector) GetTableSchema( ctx context.Context, _env map[string]string, _version uint32, _system protos.TypeSystem, tableMappings []*protos.TableMapping, ) (map[string]*protos.TableSchema, error)
func (*ClickHouseConnector) GetVersion ¶
func (c *ClickHouseConnector) GetVersion(ctx context.Context) (string, error)
func (*ClickHouseConnector) NormalizeRecords ¶
func (c *ClickHouseConnector) NormalizeRecords( ctx context.Context, req *model.NormalizeRecordsRequest, ) (model.NormalizeResponse, error)
func (*ClickHouseConnector) RemoveTableEntriesFromRawTable ¶
func (c *ClickHouseConnector) RemoveTableEntriesFromRawTable( ctx context.Context, req *protos.RemoveTablesFromRawTableInput, ) error
func (*ClickHouseConnector) RenameTables ¶
func (c *ClickHouseConnector) RenameTables( ctx context.Context, req *protos.RenameTablesInput, ) (*protos.RenameTablesOutput, error)
func (*ClickHouseConnector) ReplayTableSchemaDeltas ¶
func (c *ClickHouseConnector) ReplayTableSchemaDeltas( ctx context.Context, env map[string]string, flowJobName string, tableMappings []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta, ) error
func (*ClickHouseConnector) SetupNormalizedTable ¶
func (c *ClickHouseConnector) SetupNormalizedTable( ctx context.Context, tx any, config *protos.SetupNormalizedTableBatchInput, destinationTableIdentifier string, sourceTableSchema *protos.TableSchema, ) (bool, error)
func (*ClickHouseConnector) SetupQRepMetadataTables ¶
func (*ClickHouseConnector) SetupQRepMetadataTables(_ context.Context, _ *protos.QRepConfig) error
func (*ClickHouseConnector) StartSetupNormalizedTables ¶
func (c *ClickHouseConnector) StartSetupNormalizedTables(_ context.Context) (any, error)
func (*ClickHouseConnector) SyncFlowCleanup ¶
func (c *ClickHouseConnector) SyncFlowCleanup(ctx context.Context, jobName string) error
func (*ClickHouseConnector) SyncQRepObjects ¶
func (c *ClickHouseConnector) SyncQRepObjects( ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition, stream *model.QObjectStream, ) (int64, shared.QRepWarnings, error)
SyncQRepObjects syncs data from downloadable objects (URLs) to ClickHouse Supports all ClickHouse formats based on the stream's Format field Objects are batched by size limits to reduce INSERT operations
func (*ClickHouseConnector) SyncQRepRecords ¶
func (c *ClickHouseConnector) SyncQRepRecords( ctx context.Context, config *protos.QRepConfig, partition *protos.QRepPartition, stream *model.QRecordStream, ) (int64, shared.QRepWarnings, error)
func (*ClickHouseConnector) SyncRecords ¶
func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error)
func (*ClickHouseConnector) ValidateCheck ¶
func (c *ClickHouseConnector) ValidateCheck(ctx context.Context) error
Performs some checks on the ClickHouse peer to ensure it will work for mirrors
func (*ClickHouseConnector) ValidateMirrorDestination ¶
func (c *ClickHouseConnector) ValidateMirrorDestination( ctx context.Context, cfg *protos.FlowConnectionConfigsCore, tableNameSchemaMapping map[string]*protos.TableSchema, ) error
type NormalizeQueryGenerator ¶
type NormalizeQueryGenerator struct {
Query string
TableName string
// contains filtered or unexported fields
}
func NewNormalizeQueryGenerator ¶
func NewNormalizeQueryGenerator( tableName string, tableNameSchemaMapping map[string]*protos.TableSchema, tableMappings []*protos.TableMapping, endBatchID int64, lastNormBatchID int64, enablePrimaryUpdate bool, sourceSchemaAsDestinationColumn bool, env map[string]string, rawTableName string, chVersion *chproto.Version, cluster bool, configuredSoftDeleteColName string, version uint32, ) *NormalizeQueryGenerator
NewTableNormalizeQuery constructs a TableNormalizeQuery with required fields.
func (*NormalizeQueryGenerator) BuildQuery ¶
func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error)