Skip to content

Instantly share code, notes, and snippets.

@dacr
Created December 15, 2025 17:29
Show Gist options
  • Select an option

  • Save dacr/4b32cddf73ba5a96823aa79ab1c13cc7 to your computer and use it in GitHub Desktop.

Select an option

Save dacr/4b32cddf73ba5a96823aa79ab1c13cc7 to your computer and use it in GitHub Desktop.
Reindex user indices as required by opensearch 3 which no longer support opensearch 1.x indices / published by https://github.com/dacr/code-examples-manager #87e7c0d1-55da-4133-9f0e-da6ab7ac64fe/de043dcf55b5da39b9bcdf372bd04a429456e1b0
// summary : Reindex user indices as required by opensearch 3 which no longer support opensearch 1.x indices
// keywords : scala, opensearch, migration, reindex, upgrade
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 87e7c0d1-55da-4133-9f0e-da6ab7ac64fe
// created-on : 2025-05-26T00:04:05+02:00
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : scala-cli $file
// attachments :
//> using scala "3.7.0"
//> using dep dev.zio::zio:2.1.18
//> using dep dev.zio::zio-json:0.7.43
//> using dep com.softwaremill.sttp.client4::zio:4.0.7
//> using dep com.softwaremill.sttp.client4::zio-json:4.0.7
import sttp.capabilities.zio.ZioStreams
import zio.*
import sttp.client4.*
import sttp.client4.httpclient.zio.HttpClientZioBackend
import sttp.client4.ziojson.*
import zio.json.JsonCodec
import zio.json.ast.Json
import java.util.UUID
object TrustAllCertificates {
import java.net.http.HttpClient
import java.security.SecureRandom
import java.security.cert.CertificateException
import java.security.cert.X509Certificate
import javax.net.ssl.{SSLContext, SSLEngine, SSLParameters, TrustManager, X509ExtendedTrustManager}
import java.net.Socket
// Create a trust manager that does not validate certificate chains and ignores hostname verification
val trustAllCerts: Array[TrustManager] = Array(new X509ExtendedTrustManager {
override def getAcceptedIssuers: Array[X509Certificate] = Array.empty
override def checkClientTrusted(certs: Array[X509Certificate], authType: String): Unit = {}
override def checkServerTrusted(certs: Array[X509Certificate], authType: String): Unit = {}
// These methods are called for hostname verification
override def checkClientTrusted(certs: Array[X509Certificate], authType: String, socket: Socket): Unit = {}
override def checkServerTrusted(certs: Array[X509Certificate], authType: String, socket: Socket): Unit = {}
override def checkClientTrusted(certs: Array[X509Certificate], authType: String, engine: SSLEngine): Unit = {}
override def checkServerTrusted(certs: Array[X509Certificate], authType: String, engine: SSLEngine): Unit = {}
})
// Install the all-trusting trust manager
val sslContext: SSLContext = SSLContext.getInstance("TLS")
sslContext.init(null, trustAllCerts, new SecureRandom())
// Create SSLParameters to disable hostname verification
val sslParameters = new SSLParameters()
sslParameters.setEndpointIdentificationAlgorithm(null) // Setting to null disables hostname verification
val httpClient: HttpClient = java.net.http.HttpClient
.newBuilder()
.sslContext(sslContext)
.sslParameters(sslParameters) // Apply SSLParameters to disable hostname verification
.build()
}
case class IndexInfo(index: String, uuid: String, status: String, health: String) derives JsonCodec
case class IndexSettings(index: String, creation_date_string: String, created_string: String, upgraded_string: Option[String]) derives JsonCodec
case class ReindexSource(index: String) derives JsonCodec
case class ReindexDest(index: String, op_type: Option[String] = None) derives JsonCodec
case class ReindexRequestPayload(source: ReindexSource, dest: ReindexDest) derives JsonCodec
case class ReindexFailureItem(
index: String,
`type`: Option[String], // type is a keyword, use backticks
id: String,
cause: Map[String, Json], // Cause can be complex
status: Int
) derives JsonCodec
case class ReindexResponse(
took: Long,
timed_out: Boolean,
total: Long,
updated: Long,
created: Long,
deleted: Long,
batches: Long,
version_conflicts: Long,
noops: Long,
retries: Map[String, Long],
throttled_millis: Long,
requests_per_second: Float,
throttled_until_millis: Long,
failures: List[ReindexFailureItem]
) derives JsonCodec
case class GenericResponse(acknowledged: Boolean) derives JsonCodec
val app = for {
backend <- HttpClientZioBackend.scopedUsingClient(TrustAllCertificates.httpClient)
searchURL <- System.env("CEM_ELASTIC_URL").someOrElse("https://192.168.1.223:9200")
username <- System.env("CEM_ELASTIC_USERNAME").someOrElse("admin")
password <- System.env("CEM_ELASTIC_PASSWORD").someOrFail(s"NO PASSWORD PROVIDED for $username")
requestIndices = basicRequest.auth
.basic(username, password)
.get(uri"$searchURL/_cat/indices?format=json&h=index,uuid,status,health")
.response(asJson[List[IndexInfo]])
_ <- Console.printLine(s"Starting reindexing $searchURL $username")
indices <- backend.send(requestIndices).map(_.body).absolve
_ <- Console.printLine(s"Found ${indices.length} index to process")
_ <- ZIO.foreachDiscard(indices)(index => Console.printLine(index))
requestSettings =
basicRequest.auth
.basic(username, password)
.get(
uri"""$searchURL/_settings?human&flat_settings&filter_path=*.settings.index\.version\.created_string,*.settings.index\.version\.upgraded_string,*.settings.index\.creation_date_string"""
)
.response(asJson[Map[String, Map[String, Map[String, String]]]])
rawSettings <- backend.send(requestSettings).map(_.body).absolve
settings = rawSettings.map((index, indexSettings) =>
IndexSettings(
index,
indexSettings("settings")("index.creation_date_string"), // UNSAFE
indexSettings("settings")("index.version.created_string"), // UNSAFE
indexSettings("settings").get("index.version.upgraded_string") // UNSAFE
)
)
_ <- Console.printLine("------------ to reindex ------------")
toReindex = settings.toList.sortBy(_.index).filter(_.created_string.startsWith("1."))
_ <- ZIO.foreachDiscard(toReindex)(indexSettings => Console.printLine(indexSettings))
_ <- ZIO.foreachDiscard(toReindex) { toReindex =>
for {
_ <- Console.printLine("--------------------------------------------------")
sourceIndex = toReindex.index
targetIndex = s"${sourceIndex}_reindexed"
_ <- Console.printLine(s"Reindexing $sourceIndex")
payload = ReindexRequestPayload(
source = ReindexSource(index = sourceIndex),
dest = ReindexDest(index = targetIndex)
)
requestReindex = basicRequest.auth
.basic(username, password)
.post(uri"$searchURL/_reindex?wait_for_completion=true")
.body(asJson(payload))
.response(asJson[ReindexResponse])
reindexResponse <- backend.send(requestReindex).map(_.body).absolve
_ <- Console.printLine(s"reindex $sourceIndex : failuresCount=${reindexResponse.failures.size}")
requestDelete = basicRequest.auth
.basic(username, password)
.delete(uri"$searchURL/$sourceIndex")
.response(asJson[GenericResponse])
deleteResponse <- backend.send(requestDelete).map(_.body).absolve
_ <- Console.printLine(s"delete $sourceIndex response = $deleteResponse")
renamePayload = ReindexRequestPayload(
source = ReindexSource(index = targetIndex),
dest = ReindexDest(index = sourceIndex)
)
requestRename = basicRequest.auth
.basic(username, password)
.post(uri"$searchURL/_reindex?wait_for_completion=true")
.body(asJson(renamePayload))
.response(asJson[ReindexResponse])
renameResponse <- backend.send(requestRename).map(_.body).absolve
_ <- Console.printLine(s"rename $sourceIndex : failuresCount=${renameResponse.failures.size}")
requestDelete2 = basicRequest.auth
.basic(username, password)
.delete(uri"$searchURL/$targetIndex")
.response(asJson[GenericResponse])
deleteResponse2 <- backend.send(requestDelete2).map(_.body).absolve
_ <- Console.printLine(s"delete $targetIndex response = $deleteResponse2")
} yield ()
}
} yield ()
Unsafe.unsafe(implicit u => Runtime.default.unsafe.run(app.logError.provide(Scope.default)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment