Replicated
The engine is based on the Atomic engine. It supports replication of metadata via DDL log being written to ZooKeeper and executed on all of the replicas for a given database.
One ClickHouse server can have multiple replicated databases running and updating at the same time. But there can't be multiple replicas of the same replicated database.
Creating a Database
CREATE DATABASE testdb ENGINE = Replicated('zoo_path', 'shard_name', 'replica_name') [SETTINGS ...]
Engine Parameters
zoo_path
— ZooKeeper path. The same ZooKeeper path corresponds to the same database.shard_name
— Shard name. Database replicas are grouped into shards byshard_name
.replica_name
— Replica name. Replica names must be different for all replicas of the same shard.
For ReplicatedMergeTree tables if no arguments provided, then default arguments are used: /clickhouse/tables/{uuid}/{shard}
and {replica}
. These can be changed in the server settings default_replica_path and default_replica_name. Macro {uuid}
is unfolded to table's uuid, {shard}
and {replica}
are unfolded to values from server config, not from database engine arguments. But in the future, it will be possible to use shard_name
and replica_name
of Replicated database.
Specifics and Recommendations
DDL queries with Replicated
database work in a similar way to ON CLUSTER queries, but with minor differences.
First, the DDL request tries to execute on the initiator (the host that originally received the request from the user). If the request is not fulfilled, then the user immediately receives an error, other hosts do not try to fulfill it. If the request has been successfully completed on the initiator, then all other hosts will automatically retry until they complete it. The initiator will try to wait for the query to be completed on other hosts (no longer than distributed_ddl_task_timeout) and will return a table with the query execution statuses on each host.
The behavior in case of errors is regulated by the distributed_ddl_output_mode setting, for a Replicated
database it is better to set it to null_status_on_timeout
— i.e. if some hosts did not have time to execute the request for distributed_ddl_task_timeout, then do not throw an exception, but show the NULL
status for them in the table.
The system.clusters system table contains a cluster named like the replicated database, which consists of all replicas of the database. This cluster is updated automatically when creating/deleting replicas, and it can be used for Distributed tables.
When creating a new replica of the database, this replica creates tables by itself. If the replica has been unavailable for a long time and has lagged behind the replication log — it checks its local metadata with the current metadata in ZooKeeper, moves the extra tables with data to a separate non-replicated database (so as not to accidentally delete anything superfluous), creates the missing tables, updates the table names if they have been renamed. The data is replicated at the ReplicatedMergeTree
level, i.e. if the table is not replicated, the data will not be replicated (the database is responsible only for metadata).
ALTER TABLE FREEZE|ATTACH|FETCH|DROP|DROP DETACHED|DETACH PARTITION|PART
queries are allowed but not replicated. The database engine will only add/fetch/remove the partition/part to the current replica. However, if the table itself uses a Replicated table engine, then the data will be replicated after using ATTACH
.
In case you need only configure a cluster without maintaining table replication, refer to Cluster Discovery feature.
Usage Example
Creating a cluster with three hosts:
node1 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','shard1','replica1');
node2 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','shard1','other_replica');
node3 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','other_shard','{replica}');
Running the DDL-query:
CREATE TABLE r.rmt (n UInt64) ENGINE=ReplicatedMergeTree ORDER BY n;
┌─────hosts────────────┬──status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ shard1|replica1 │ 0 │ │ 2 │ 0 │
│ shard1|other_replica │ 0 │ │ 1 │ 0 │
│ other_shard|r1 │ 0 │ │ 0 │ 0 │
└──────────────────────┴─────────┴───────┴─────────────────────┴──────────────────┘
Showing the system table:
SELECT cluster, shard_num, replica_num, host_name, host_address, port, is_local
FROM system.clusters WHERE cluster='r';
┌─cluster─┬─shard_num─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┐
│ r │ 1 │ 1 │ node3 │ 127.0.0.1 │ 9002 │ 0 │
│ r │ 2 │ 1 │ node2 │ 127.0.0.1 │ 9001 │ 0 │
│ r │ 2 │ 2 │ node1 │ 127.0.0.1 │ 9000 │ 1 │
└─────────┴───────────┴─────────────┴───────────┴──────────────┴──────┴──────────┘
Creating a distributed table and inserting the data:
node2 :) CREATE TABLE r.d (n UInt64) ENGINE=Distributed('r','r','rmt', n % 2);
node3 :) INSERT INTO r.d SELECT * FROM numbers(10);
node1 :) SELECT materialize(hostName()) AS host, groupArray(n) FROM r.d GROUP BY host;
┌─hosts─┬─groupArray(n)─┐
│ node3 │ [1,3,5,7,9] │
│ node2 │ [0,2,4,6,8] │
└───────┴───────────────┘
Adding replica on the one more host:
node4 :) CREATE DATABASE r ENGINE=Replicated('some/path/r','other_shard','r2');
The cluster configuration will look like this:
┌─cluster─┬─shard_num─┬─replica_num─┬─host_name─┬─host_address─┬─port─┬─is_local─┐
│ r │ 1 │ 1 │ node3 │ 127.0.0.1 │ 9002 │ 0 │
│ r │ 1 │ 2 │ node4 │ 127.0.0.1 │ 9003 │ 0 │
│ r │ 2 │ 1 │ node2 │ 127.0.0.1 │ 9001 │ 0 │
│ r │ 2 │ 2 │ node1 │ 127.0.0.1 │ 9000 │ 1 │
└─────────┴───────────┴─────────────┴───────────┴──────────────┴──────┴──────────┘
The distributed table also will get data from the new host:
node2 :) SELECT materialize(hostName()) AS host, groupArray(n) FROM r.d GROUP BY host;
┌─hosts─┬─groupArray(n)─┐
│ node2 │ [1,3,5,7,9] │
│ node4 │ [0,2,4,6,8] │
└───────┴───────────────┘