Created
December 15, 2025 17:29
-
-
Save dacr/4b32cddf73ba5a96823aa79ab1c13cc7 to your computer and use it in GitHub Desktop.
Reindex user indices as required by opensearch 3 which no longer support opensearch 1.x indices / published by https://github.com/dacr/code-examples-manager #87e7c0d1-55da-4133-9f0e-da6ab7ac64fe/de043dcf55b5da39b9bcdf372bd04a429456e1b0
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // summary : Reindex user indices as required by opensearch 3 which no longer support opensearch 1.x indices | |
| // keywords : scala, opensearch, migration, reindex, upgrade | |
| // publish : gist | |
| // authors : David Crosson | |
| // license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2) | |
| // id : 87e7c0d1-55da-4133-9f0e-da6ab7ac64fe | |
| // created-on : 2025-05-26T00:04:05+02:00 | |
| // managed-by : https://github.com/dacr/code-examples-manager | |
| // run-with : scala-cli $file | |
| // attachments : | |
| //> using scala "3.7.0" | |
| //> using dep dev.zio::zio:2.1.18 | |
| //> using dep dev.zio::zio-json:0.7.43 | |
| //> using dep com.softwaremill.sttp.client4::zio:4.0.7 | |
| //> using dep com.softwaremill.sttp.client4::zio-json:4.0.7 | |
| import sttp.capabilities.zio.ZioStreams | |
| import zio.* | |
| import sttp.client4.* | |
| import sttp.client4.httpclient.zio.HttpClientZioBackend | |
| import sttp.client4.ziojson.* | |
| import zio.json.JsonCodec | |
| import zio.json.ast.Json | |
| import java.util.UUID | |
| object TrustAllCertificates { | |
| import java.net.http.HttpClient | |
| import java.security.SecureRandom | |
| import java.security.cert.CertificateException | |
| import java.security.cert.X509Certificate | |
| import javax.net.ssl.{SSLContext, SSLEngine, SSLParameters, TrustManager, X509ExtendedTrustManager} | |
| import java.net.Socket | |
| // Create a trust manager that does not validate certificate chains and ignores hostname verification | |
| val trustAllCerts: Array[TrustManager] = Array(new X509ExtendedTrustManager { | |
| override def getAcceptedIssuers: Array[X509Certificate] = Array.empty | |
| override def checkClientTrusted(certs: Array[X509Certificate], authType: String): Unit = {} | |
| override def checkServerTrusted(certs: Array[X509Certificate], authType: String): Unit = {} | |
| // These methods are called for hostname verification | |
| override def checkClientTrusted(certs: Array[X509Certificate], authType: String, socket: Socket): Unit = {} | |
| override def checkServerTrusted(certs: Array[X509Certificate], authType: String, socket: Socket): Unit = {} | |
| override def checkClientTrusted(certs: Array[X509Certificate], authType: String, engine: SSLEngine): Unit = {} | |
| override def checkServerTrusted(certs: Array[X509Certificate], authType: String, engine: SSLEngine): Unit = {} | |
| }) | |
| // Install the all-trusting trust manager | |
| val sslContext: SSLContext = SSLContext.getInstance("TLS") | |
| sslContext.init(null, trustAllCerts, new SecureRandom()) | |
| // Create SSLParameters to disable hostname verification | |
| val sslParameters = new SSLParameters() | |
| sslParameters.setEndpointIdentificationAlgorithm(null) // Setting to null disables hostname verification | |
| val httpClient: HttpClient = java.net.http.HttpClient | |
| .newBuilder() | |
| .sslContext(sslContext) | |
| .sslParameters(sslParameters) // Apply SSLParameters to disable hostname verification | |
| .build() | |
| } | |
| case class IndexInfo(index: String, uuid: String, status: String, health: String) derives JsonCodec | |
| case class IndexSettings(index: String, creation_date_string: String, created_string: String, upgraded_string: Option[String]) derives JsonCodec | |
| case class ReindexSource(index: String) derives JsonCodec | |
| case class ReindexDest(index: String, op_type: Option[String] = None) derives JsonCodec | |
| case class ReindexRequestPayload(source: ReindexSource, dest: ReindexDest) derives JsonCodec | |
| case class ReindexFailureItem( | |
| index: String, | |
| `type`: Option[String], // type is a keyword, use backticks | |
| id: String, | |
| cause: Map[String, Json], // Cause can be complex | |
| status: Int | |
| ) derives JsonCodec | |
| case class ReindexResponse( | |
| took: Long, | |
| timed_out: Boolean, | |
| total: Long, | |
| updated: Long, | |
| created: Long, | |
| deleted: Long, | |
| batches: Long, | |
| version_conflicts: Long, | |
| noops: Long, | |
| retries: Map[String, Long], | |
| throttled_millis: Long, | |
| requests_per_second: Float, | |
| throttled_until_millis: Long, | |
| failures: List[ReindexFailureItem] | |
| ) derives JsonCodec | |
| case class GenericResponse(acknowledged: Boolean) derives JsonCodec | |
| val app = for { | |
| backend <- HttpClientZioBackend.scopedUsingClient(TrustAllCertificates.httpClient) | |
| searchURL <- System.env("CEM_ELASTIC_URL").someOrElse("https://192.168.1.223:9200") | |
| username <- System.env("CEM_ELASTIC_USERNAME").someOrElse("admin") | |
| password <- System.env("CEM_ELASTIC_PASSWORD").someOrFail(s"NO PASSWORD PROVIDED for $username") | |
| requestIndices = basicRequest.auth | |
| .basic(username, password) | |
| .get(uri"$searchURL/_cat/indices?format=json&h=index,uuid,status,health") | |
| .response(asJson[List[IndexInfo]]) | |
| _ <- Console.printLine(s"Starting reindexing $searchURL $username") | |
| indices <- backend.send(requestIndices).map(_.body).absolve | |
| _ <- Console.printLine(s"Found ${indices.length} index to process") | |
| _ <- ZIO.foreachDiscard(indices)(index => Console.printLine(index)) | |
| requestSettings = | |
| basicRequest.auth | |
| .basic(username, password) | |
| .get( | |
| uri"""$searchURL/_settings?human&flat_settings&filter_path=*.settings.index\.version\.created_string,*.settings.index\.version\.upgraded_string,*.settings.index\.creation_date_string""" | |
| ) | |
| .response(asJson[Map[String, Map[String, Map[String, String]]]]) | |
| rawSettings <- backend.send(requestSettings).map(_.body).absolve | |
| settings = rawSettings.map((index, indexSettings) => | |
| IndexSettings( | |
| index, | |
| indexSettings("settings")("index.creation_date_string"), // UNSAFE | |
| indexSettings("settings")("index.version.created_string"), // UNSAFE | |
| indexSettings("settings").get("index.version.upgraded_string") // UNSAFE | |
| ) | |
| ) | |
| _ <- Console.printLine("------------ to reindex ------------") | |
| toReindex = settings.toList.sortBy(_.index).filter(_.created_string.startsWith("1.")) | |
| _ <- ZIO.foreachDiscard(toReindex)(indexSettings => Console.printLine(indexSettings)) | |
| _ <- ZIO.foreachDiscard(toReindex) { toReindex => | |
| for { | |
| _ <- Console.printLine("--------------------------------------------------") | |
| sourceIndex = toReindex.index | |
| targetIndex = s"${sourceIndex}_reindexed" | |
| _ <- Console.printLine(s"Reindexing $sourceIndex") | |
| payload = ReindexRequestPayload( | |
| source = ReindexSource(index = sourceIndex), | |
| dest = ReindexDest(index = targetIndex) | |
| ) | |
| requestReindex = basicRequest.auth | |
| .basic(username, password) | |
| .post(uri"$searchURL/_reindex?wait_for_completion=true") | |
| .body(asJson(payload)) | |
| .response(asJson[ReindexResponse]) | |
| reindexResponse <- backend.send(requestReindex).map(_.body).absolve | |
| _ <- Console.printLine(s"reindex $sourceIndex : failuresCount=${reindexResponse.failures.size}") | |
| requestDelete = basicRequest.auth | |
| .basic(username, password) | |
| .delete(uri"$searchURL/$sourceIndex") | |
| .response(asJson[GenericResponse]) | |
| deleteResponse <- backend.send(requestDelete).map(_.body).absolve | |
| _ <- Console.printLine(s"delete $sourceIndex response = $deleteResponse") | |
| renamePayload = ReindexRequestPayload( | |
| source = ReindexSource(index = targetIndex), | |
| dest = ReindexDest(index = sourceIndex) | |
| ) | |
| requestRename = basicRequest.auth | |
| .basic(username, password) | |
| .post(uri"$searchURL/_reindex?wait_for_completion=true") | |
| .body(asJson(renamePayload)) | |
| .response(asJson[ReindexResponse]) | |
| renameResponse <- backend.send(requestRename).map(_.body).absolve | |
| _ <- Console.printLine(s"rename $sourceIndex : failuresCount=${renameResponse.failures.size}") | |
| requestDelete2 = basicRequest.auth | |
| .basic(username, password) | |
| .delete(uri"$searchURL/$targetIndex") | |
| .response(asJson[GenericResponse]) | |
| deleteResponse2 <- backend.send(requestDelete2).map(_.body).absolve | |
| _ <- Console.printLine(s"delete $targetIndex response = $deleteResponse2") | |
| } yield () | |
| } | |
| } yield () | |
| Unsafe.unsafe(implicit u => Runtime.default.unsafe.run(app.logError.provide(Scope.default))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment