#!/bin/bash
# Script to copy a Cassandra keyspace to another using DSBulk
#
# Designed for Axway API Gateway keyspaces

### Definitions
# Working directory for helper files/scripts.
wrk_dir="$(pwd)/ks-copy"

# Where to store the dsbulk backups which will be restored to the new keyspace.
bkp_dir="${wrk_dir}/backup"

# Where to store dsbulk logs
log_dir="${wrk_dir}/logs"

# Source
src_ip="10.133.133.122"
src_port=""					# if omitted, default port will be asumed
src_usr=""
src_pwd=""
src_ks="xapim_ks"			

# Destination
dst_ip="10.133.133.122"
dst_port=""					# if omitted, default port will be asumed
dst_usr=""
dst_pwd=""
dst_ks="xapim_ks"

# Destination replication strategy and factor. If unset or set to blank value, the replication factor of system_auth keyspace will be used.
#replication="'class': 'NetworkTopologyStrategy', 'dc1': '3'"

# Execution string for cqlsh
#cqlsh="/opt/axway/cassandra/bin/cqlsh"
#cqlsh="${HOME}/cqlsh/bin/cqlsh"${wrk_dir}
cqlsh="docker run -v ${wrk_dir}:${wrk_dir} --rm -it cassandra:4.0.12 cqlsh"

# Command line arguments for cqlsh, such as connect-timeout, request-timeout etc. Add any extra cqlsh arguments here.
# Do not add any arguments related to host, port or credentials as they would be in conflict
cqlsh_args="--request-timeout 120 --no-color"			# Common command line arguments (both source and destination) for cqlsh
cqlsh_src=""											# Extra cqlsh arguments for source, for example cqlshrc
cqlsh_dst=""											# Extra cqlsh arguments for destination, for example cqlshrc

# Path to dsbulk.
#dsbulk="${HOME}/dsbulk-1.11.0/bin/dsbulk"
dsbulk="docker run -v ${wrk_dir}:${wrk_dir} -v ${bkp_dir}:${bkp_dir} --rm -it datastax/cassandra-data-migrator:4.3.6 /assets/dsbulk/bin/dsbulk"

# Command line arguments for dsbulk. Add any extra dsbulk arguments here.
# Do not add any arguments related to host, port or credentials as they would be in conflict
# Common command line arguments for dsbulk. 
dsbulk_args="--connector.csv.maxCharsPerColumn -1"							# The maximum number of characters that a field can contain. Set to -1 so buffers are resized dynamically.
#dsbulk_args="${dsbulk_args} --dsbulk.engine.maxConcurrentQueries 2"		# The maximum number of concurrent queries that should be carried in parallel.
dsbulk_args="${dsbulk_args} --dsbulk.schema.preserveTimestamp true"			# Preserve cell timestamps when loading and unloading.
dsbulk_args="${dsbulk_args} --dsbulk.schema.preserveTtl true"				# Preserve cell TTLs when loading and unloading.

dsbulk_src="-cl LOCAL_ONE "							    					# Extra dsbulk arguments for source, such as consistency level
dsbulk_dst="-cl LOCAL_QUORUM --executor.continuousPaging.enabled false"		# Extra dsbulk arguments for destination, for example consistency level

# Action in case destination keyspace exists.
# Valid options are d=drop, m=merge, a=abort.
# Setting to anything else will prompt the user at runtime.
# Use merge option with caution, it does not update existing table schema and imports may fail.
dst_ks_action=""

### Function definitions
# Check the defined executables
check_exe () {
	# Check if cqlsh exists and can be executed
	if [ ! -x $(which ${cqlsh}) ]; then
		echo -e "\033[0mcqlsh \033[0;31mnot found or not executable, please check \033[0mcqlsh\033[0;31m path.\033[0m"
		exit 1
	fi

	# Check if dsbulk exists and can be executed
	if [ ! -x $(which ${dsbulk}) ]; then
		echo -e "\033[0mdsbulk\033[0;31m not found or not executable, please check \033[0dsbulk\033[0;31m path.\033[0m"
		echo "DSBulk may be downloaded from https://github.com/datastax/dsbulk/releases"
		#curl https://github.com/datastax/dsbulk/releases/download/1.11.0/dsbulk-1.11.0.tar.gz -OJ
		exit 2
	fi
}

# Check keyspace name length
check_ks () {
	if [ ${#src_ks} -lt 1 ] || [ ${#src_ks} -gt 48 ]; then
		echo -e "\033[0;31mSource keyspace name must not be empty or more than 48 characters long. Check the definition.\033[0m"
		exit 3
	fi

	if [ ${#dst_ks} -lt 1 ] || [ ${#dst_ks} -gt 48 ]; then
		echo -e "\033[0;31mDestination keyspace name must not be empty or more than 48 characters long. Check the definition.\033[0m"
		exit 4
	fi
}

# Check if working and backup directories are set and set them if not.
check_dirs () {
	if [ -z "${wrk_dir}" ]; then
		wrk_dir="${pwd}"
	fi

	if [ ! -d "${wrk_dir}" ]; then
		mkdir -p ${wrk_dir} || { echo -e "\033[0;31mDirectory \033[0m${wrk_dir}\033[0;31m can't be created, please check.\033[0m"; exit 1; }
	fi

	if [ -z "${bkp_dir}" ]; then
		bkp_dir="${wrk_dir}/backup"
	fi
}

# Function to check if a keyspace exists
ks_exists () {
    ${cqlsh} $1 ${cqlsh_args} -e "copy system_schema.keyspaces (keyspace_name) to stdout; copy system.schema_keyspaces (keyspace_name) to stdout;" | grep -v ":" | grep -vE '\bsystem(\b|_.*)' | grep -e "\b"$2"\b" >/dev/null 
    echo $?
}

cass_ver () {
	local ver
    ver=$(${cqlsh} $1 ${cqlsh_args} -e 'SHOW VERSION;' 2>/dev/null | grep 'Cassandra' | cut -d'|' -f2 | cut -d' ' -f3)
	echo "${ver}"
}

# Fuction for escaping special characters for some variables
escape () {
    local input="$1"
    local escaped_string
    # Use sed to escape special characters
    escaped_string=$(sed 's/[][\/.^$*]/\\&/g' <<< "$input")
    echo "$escaped_string"
}

### Execution
check_exe
check_ks
check_dirs

# Add host, port and credentials if passed to cqlsh / dsbulk if supplied
if [ -n "${src_usr}" ]; then
	cqlsh_src="$cqlsh_src -u ${src_usr}"
	dsbulk_src="$dsbulk_src -u ${src_usr}"
	if [ -n "${src_pwd}" ]; then
		cqlsh_src="$cqlsh_src -p ${src_pwd}"
		dsbulk_src="$dsbulk_src -p ${src_pwd}"
	fi
fi

if [ -n "${src_ip}" ]; then
	cqlsh_src="$cqlsh_src ${src_ip}"
	dsbulk_src="$dsbulk_src -h ${src_ip}"
	if [ -n "${src_port}" ]; then
		cqlsh_src="$cqlsh_src ${src_port}"
		dsbulk_src="$dsbulk_src -port ${src_port}"
	fi
fi

if [ -n "${dst_usr}" ]; then
	cqlsh_dst="$cqlsh_dst -u ${dst_usr}"
	dsbulk_dst="$dsbulk_dst -u ${dst_usr}"
	if [ -n "${dst_pwd}" ]; then
		cqlsh_dst="$cqlsh_dst -p ${dst_pwd}"
		dsbulk_dst="$dsbulk_dst -p ${dst_pwd}"
	fi
fi

if [ -n "${dst_ip}" ]; then
	cqlsh_dst="$cqlsh_dst ${dst_ip}"
	dsbulk_dst="$dsbulk_dst -h ${dst_ip}"
	if [ -n "${dst_port}" ]; then
		cqlsh_dst="$cqlsh_dst ${dst_port}"
		dsbulk_dst="$dsbulk_dst -port ${dst_port}"
	fi
fi

# Retrieve Cassandra versions
echo -e "\033[0;32mChecking Cassandra versions...\033[0m"
src_ver=$(cass_ver "${cqlsh_src}")
if [ -z "${src_ver}" ]; then
	echo -e "\033[0;31mUnable to retrieve source Cassandra version. Please check source server and port definition or network connection.\033[0m"
	exit 5
fi

echo -e "\033[0;36mSource Cassandra host:\033[0m ${src_ip} \033[0;36mkeyspace:\033[0m ${src_ks} \033[0;36mversion:\033[0m ${src_ver}" 

dst_ver=$(cass_ver "${cqlsh_dst}")
if [ -z "${dst_ver}" ]; then
	echo -e "\033[0;31mUnable to retrieve destination Cassandra version. Please check destination server and port definition or network connection.\033[0m"
	exit 6
fi

echo -e "\033[0;36mTarget Cassandra host:\033[0m ${dst_ip} \033[0;36mkeyspace:\033[0m ${dst_ks} \033[0;36mversion:\033[0m ${dst_ver}" 


# Set replication if not defined
if [ -z "${replication}" ]; then
	#replication=$(${cqlsh} ${cqlsh_dst} ${cqlsh_args} -e 'desc system_auth;' 2>/dev/null | grep 'KEYSPACE' | sed -E 's/.*\{(.*)\}.*/\1/g')
	if [ $(echo ${dst_ver} | cut -d'.' -f1) -le 2 ]; then
		replication=$(${cqlsh} ${cqlsh_dst} ${cqlsh_args} -e 'select * from system.schema_keyspaces ;' 2>/dev/null | grep 'system_auth' | sed -E 's/.*\{(.*)\}.*/\1/g')
	else
		replication=$(${cqlsh} ${cqlsh_dst} ${cqlsh_args} -e 'select * from system_schema.keyspaces ;' 2>/dev/null | grep 'system_auth' | sed -E 's/.*\{(.*)\}.*/\1/g')
	fi
fi

if [ -z "${replication}" ]; then
	replication='\2'
fi

# Check if the source keyspace exists
echo -e "\033[0;32mGetting source keyspace schema...\033[0m"
if [ "$(ks_exists "${cqlsh_src}" ${src_ks})" -ne 0 ]; then
    echo -e "\033[0;31mUnable to contact source host or source keyspace ${src_ks} doesn't exist. Check the variable definitions.\033[0m"
    exit 7
fi

# Get source keyspace schema definition
${cqlsh} ${cqlsh_src} ${cqlsh_args} -e "describe ${src_ks};" 2>/dev/null | perl -0pe 's/^WARNING.*?$//gm;' > "${wrk_dir}/src_ks.cql"
if [ $? -ne 0 ]; then
    echo -e "\033[0;31mError getting keyspace. Check source keyspace definition.\033[0m"
    exit 3
fi

# Define Perl subroutine for processing the schema file
perl_schema="
	# Replace source keyspace name with destination keyspace
	s/${src_ks}/${dst_ks}/g;

	# Change replication factor to match destination
	s/(WITH|AND) replication = {(.*)}/\1 replication = {${replication}}/g;

	# Add IF EXISTS to avoid conflicts when/if merging
	s/(CREATE (KEYSPACE|TABLE|INDEX))/\1 IF NOT EXISTS/g;

	# Remove compact storage
	s/WITH COMPACT STORAGE.*?AND/WITH/gms;

	# Remove clustering order
	s/.*CLUSTERING ORDER BY.*\n//g;

	# Set speculative retry to 99 percentile
	s/speculative_retry = '(NONE|99p)'/speculative_retry = '99PERCENTILE'/g;
"

if [ $(echo ${dst_ver} | cut -d'.' -f1) -le 3 ]; then
	perl_schema="${perl_schema}
	# Set read repair chance to 0.0
	s/((dclocal_|)read_repair_chance) = (1\.0|0\.[1-9]+|0\.0[1-9])/\1 = 0.0/g;

	# Remove properties introduced in 4.0 such as read_repair, additional_write_policy (same as speculative_retry)
	s/WITH (additional_write_policy|read_repair) .*?AND/WITH/gms;
	s/.*AND (additional_write_policy|read_repair).*\n//g;"
else
	perl_schema="${perl_schema}
	# Remove read_repair_chance as it's deprecated in 4.0
	s/WITH ((dclocal_|)read_repair_chance) = (1\.0|0\.[1-9]+|0\.0[1-9]).*?AND/WITH/gms;
	s/.*AND ((dclocal_|)read_repair_chance).*\n//g;"
fi

# Prepare destination keyspace definition helper file
echo -e "\033[0;32mPreparing destination keyspace schema definition...\033[0m"
cat ${wrk_dir}/src_ks.cql \
| perl -0pe "${perl_schema}" \
> ${wrk_dir}/dst_ks.cql

# Define a Perl subroutine to process each table individually for unloading
perl_unload="
	# Remove CREATE INDEX and KEYSPACE statements
	s/^CREATE (INDEX|KEYSPACE).*$//gm;
	
	# Write dsbulk commands
	s/(CREATE TABLE (.*?)\.(.*?) \((.*?)\) WITH.*?;)/$(escape "${dsbulk}") unload ${dsbulk_src} ${dsbulk_args} -logDir $(escape "${log_dir}") -url $(escape "${bkp_dir}")\/\3 -k \2 -t \3/gms;

	# Remove CR
	s/\r//gm;

	# Remove empty lines
	s/^\n//gm;
"

echo -e "\033[0;32mPreparing DSBulk unload script...\033[0m"
# Prepare dsbulk unload script for saving the source data
cat ${wrk_dir}/src_ks.cql \
| perl -0pe "${perl_unload}" \
> ${wrk_dir}/dsbulk_unload.bash

# Define a Perl subroutine to process each table individually for lcoading
perl_load="
	# Remove CREATE INDEX and KEYSPACE statements
	s/^CREATE (INDEX|KEYSPACE).*$//gm;

	# Write dsbulk commands
	s/(CREATE TABLE (.*?)\.(.*?) \((.*?)\) WITH.*?;)/\[ \\$\(ls -1 $(escape "${bkp_dir}")\/\3 2>\/dev\/null | wc -l\) -ne 0 \] && $(escape "${dsbulk}") load ${dsbulk_dst} ${dsbulk_args} -logDir $(escape "${log_dir}") -url $(escape "${bkp_dir}")\/\3 -k \2 -t \3/gms;

	# Remove CR
	s/\r//gm;

	# Remove empty lines
	s/^\n//gm;
"

echo -e "\033[0;32mPreparing DSBulk load script...\033[0m"
# Prepare dsbulk load script for restoring the saved data into the new keyspace
cat ${wrk_dir}/src_ks.cql \
| perl -0pe "${perl_load}" \
> ${wrk_dir}/dsbulk_load.bash

# Check if destination keyspace exists and decide next action
if [ "$(ks_exists "${cqlsh_dst}" ${dst_ks})" -eq 0 ]; then
    echo -e "\033[0;31mDestination keyspace \033[0m${dst_ks}\033[0;31m already exists.\033[0m"
    case $dst_ks_action in
        a) echo -e "\033[0;32mAborting...\033[0m"
            exit 0
            ;;
        d) echo -e "\033[0;32mDropping existing keyspace...\033[0m"
            ${cqlsh} ${cqlsh_dst} ${cqlsh_args} -e "DROP KEYSPACE IF EXISTS ${dst_ks};" >/dev/null 2>/dev/null
            ;;
        m) echo -e "\033[0;32mMerging keyspace with backup...\033[0m"
            ;;
        *)
        echo -e "You may choose one of the following:\033[0m"
        echo -e "\033[0;32md\033[0m) Drop existing keyspace and create a new one (\033[0;31mCAUTION\033[0m: existing data will be lost)\033[0m"
        echo -e "\033[0;32mm\033[0m) Merge data into existing keyspace (\033[0;31mCAUTION\033[0m: existing table definitions will not be updated and import may fail or be incomplete)\033[0m"
        echo -e "\033[0;32ma\033[0m) Abort the keyspace copy operation\033[0m"
        until [[ "$dst_ks_action" =~ ^(d|m|a)$ ]]; do 
			echo -ne "How do you wish to proceed? \033[0;32m"
			read dst_ks_action
			echo -ne "\033[0m"
            case $dst_ks_action in
                a) echo -e "\033[0;32mAborting...\033[0m"
                    exit 0
                    ;;
                d) echo -e "\033[0;32mDropping existing keyspace...\033[0m"
                    ${cqlsh} ${cqlsh_dst} ${cqlsh_args} -e "DROP KEYSPACE IF EXISTS ${dst_ks};" >/dev/null 2>&1
                    ;;
                m) echo -e "\033[0;32mMerging keyspace with backup...\033[0m"
                    ;;
            esac
        done
        ;;
    esac
fi

# Create the new keyspace and tables if they do not exist
echo -e "\033[0;32mApplying destination keyspace schema...\033[0m"
${cqlsh} ${cqlsh_dst} ${cqlsh_args} -f ${wrk_dir}/dst_ks.cql >/dev/null 2>&1 

# Check if the destination keyspace was created
if [ "$(ks_exists "${cqlsh_dst}" ${dst_ks})" -ne 0 ]; then
	echo -e "\033[0;31mDestination keyspace was not created. Aborting.\033[0m"
	exit 8
fi

# Unload and load the data
echo -e "\033[0;32mBacking up the existing data with DSBulk unload...\033[0m"
# Deleting any existing backups
if [ -n "${bkp_dir}" ]; then
	rm -rf ${bkp_dir}/*
else 
	echo -e "\033[0;31mBackup directory not defined. Aborting.\033[0m"
	exit 9
fi

# Call the unload helper script
bash ${wrk_dir}/dsbulk_unload.bash

# Call the load helper script
echo -e "\033[0;32mRestoring the data in the destination keyspace with DSBulk load...\033[0m"
bash ${wrk_dir}/dsbulk_load.bash

echo -e "\033[0;32mKeyspace copying complete.\033[0m"