diff --git a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala index f3d252d413b..d692c574b8e 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala @@ -38,7 +38,8 @@ import scala.jdk.CollectionConverters._ object S3StorageClient { val MINIMUM_NUM_OF_MULTIPART_S3_PART: Long = 5L * 1024 * 1024 // 5 MiB val MAXIMUM_NUM_OF_MULTIPART_S3_PARTS = 10_000 - val PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 6 + //Keep on sync with LakeFS https://github.com/treeverse/lakeFS/pull/10180 + val PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 24 // Initialize MinIO-compatible S3 Client private lazy val s3Client: S3Client = { diff --git a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala index 24253c3a92d..a71861491a6 100644 --- a/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala +++ b/file-service/src/test/scala/org/apache/texera/service/resource/DatasetResourceSpec.scala @@ -670,6 +670,20 @@ class DatasetResourceSpec sessionRecord.getUploadId } + private def expireUploadSession(uploadId: String): Unit = { + val expiredHoursAgo = S3StorageClient.PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS + 1 + getDSLContext + .update(DATASET_UPLOAD_SESSION) + .set( + DATASET_UPLOAD_SESSION.CREATED_AT, + DSL + .field(s"current_timestamp - interval '${expiredHoursAgo} hours'") + .cast(classOf[java.time.OffsetDateTime]) + ) + .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(uploadId)) + .execute() + } + private def assertPlaceholdersCreated(uploadId: String, expectedParts: Int): Unit = { val rows = fetchPartRows(uploadId).sortBy(_.getPartNumber) rows.size shouldEqual expectedParts @@ -717,17 +731,9 @@ class DatasetResourceSpec initUpload(fpB, numParts = 2).getStatus shouldEqual 200 initUpload(fpA, numParts = 2).getStatus shouldEqual 200 - // Expire fpB by pushing created_at back > 6 hours. + // Expire fpB by pushing created_at back beyond the real session expiration window. val uploadIdB = fetchUploadIdOrFail(fpB) - val tableName = DATASET_UPLOAD_SESSION.getName // typically "dataset_upload_session" - getDSLContext - .update(DATASET_UPLOAD_SESSION) - .set( - DATASET_UPLOAD_SESSION.CREATED_AT, - DSL.field("current_timestamp - interval '7 hours'").cast(classOf[java.time.OffsetDateTime]) - ) - .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(uploadIdB)) - .execute() + expireUploadSession(uploadIdB) val listed = listUploads() listed shouldEqual listed.sorted @@ -905,19 +911,8 @@ class DatasetResourceSpec uploadPart(filePath, 1, minPartBytes(1.toByte)).getStatus shouldEqual 200 fetchPartRows(oldUploadId).find(_.getPartNumber == 1).get.getEtag.trim should not be "" - // Age the session so it is definitely expired (> PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS = 6) - val expireHrs = S3StorageClient.PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS - - getDSLContext - .update(DATASET_UPLOAD_SESSION) - .set( - DATASET_UPLOAD_SESSION.CREATED_AT, - DSL - .field(s"current_timestamp - interval '${expireHrs + 1} hours'") - .cast(classOf[java.time.OffsetDateTime]) - ) - .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(oldUploadId)) - .execute() + // Age the session so it is definitely expired (> PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS) + expireUploadSession(oldUploadId) // Same init config again -> should restart because it's expired val r2 = initUpload(filePath, numParts = 2, lastPartBytes = 123)