From 399449a1fa7a225847186f1db228dbafc1be7d79 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Sat, 4 Jan 2025 10:54:40 +0800 Subject: [PATCH] [fix] let backup work on azure 1. use https for azure 2. region is unnecessary for gcp. 3. fe pass provider to be 4. be listblob endless loop g. fe globlist use hierarchy api g. azure type should be s3 in fe 7. add azure file system adaptor in gson --- be/src/io/fs/azure_obj_storage_client.cpp | 2 +- be/src/util/s3_util.cpp | 4 +- .../apache/doris/analysis/StorageBackend.java | 1 + .../property/S3ClientBEProperties.java | 3 ++ .../apache/doris/fs/obj/AzureObjStorage.java | 42 ++++++++++++++----- .../org/apache/doris/fs/obj/S3ObjStorage.java | 5 +++ .../doris/fs/remote/AzureFileSystem.java | 5 +-- .../apache/doris/persist/gson/GsonUtils.java | 4 +- .../property/PropertyConverterTest.java | 2 +- 9 files changed, 49 insertions(+), 19 deletions(-) diff --git a/be/src/io/fs/azure_obj_storage_client.cpp b/be/src/io/fs/azure_obj_storage_client.cpp index 9f33db3400acdc1..44d45077ebcd210 100644 --- a/be/src/io/fs/azure_obj_storage_client.cpp +++ b/be/src/io/fs/azure_obj_storage_client.cpp @@ -311,7 +311,7 @@ ObjectStorageResponse AzureObjStorageClient::list_objects(const ObjectStoragePat return _client->ListBlobs(list_opts); }); get_file_file(resp); - while (!resp.NextPageToken->empty()) { + while (resp.NextPageToken.HasValue()) { list_opts.ContinuationToken = resp.NextPageToken; resp = s3_get_rate_limit([&]() { SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency); diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp index f87389b6b3e1191..dece1074e609911 100644 --- a/be/src/util/s3_util.cpp +++ b/be/src/util/s3_util.cpp @@ -254,8 +254,8 @@ std::shared_ptr S3ClientFactory::_create_azure_client( std::make_shared(s3_conf.ak, s3_conf.sk); const std::string container_name = s3_conf.bucket; - const std::string uri = fmt::format("{}://{}.blob.core.windows.net/{}", - config::s3_client_http_scheme, s3_conf.ak, container_name); + const std::string uri = + fmt::format("{}://{}.blob.core.windows.net/{}", "https", s3_conf.ak, container_name); auto containerClient = std::make_shared(uri, cred); LOG_INFO("create one azure client with {}", s3_conf.to_string()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java index b07725d2507175d..67a76cec450e360 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java @@ -178,6 +178,7 @@ public TStorageBackendType toThrift() { return TStorageBackendType.JFS; case LOCAL: return TStorageBackendType.LOCAL; + // deprecated case AZURE: return TStorageBackendType.AZURE; default: diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java index 7d8c2668fea03f4..093b74b80ae22e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java @@ -85,6 +85,9 @@ private static Map getBeAWSPropertiesFromS3(Map if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) { beProperties.put(PropertyConverter.USE_PATH_STYLE, properties.get(PropertyConverter.USE_PATH_STYLE)); } + if (properties.containsKey(S3Properties.PROVIDER)) { + beProperties.put(S3Properties.PROVIDER, properties.get(S3Properties.PROVIDER)); + } return beProperties; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java index 780d2ab9fa30852..cacfd248e5ac7a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/AzureObjStorage.java @@ -36,6 +36,7 @@ import com.azure.storage.blob.batch.BlobBatch; import com.azure.storage.blob.batch.BlobBatchClient; import com.azure.storage.blob.batch.BlobBatchClientBuilder; +import com.azure.storage.blob.models.BlobErrorCode; import com.azure.storage.blob.models.BlobItem; import com.azure.storage.blob.models.BlobProperties; import com.azure.storage.blob.models.BlobStorageException; @@ -53,6 +54,7 @@ import java.nio.file.PathMatcher; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -196,6 +198,9 @@ public Status deleteObject(String remotePath) { LOG.info("delete file " + remotePath + " success"); return Status.OK; } catch (BlobStorageException e) { + if (e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) { + return Status.OK; + } return new Status( Status.ErrCode.COMMON_ERROR, "get file from azure error: " + e.getServiceMessage()); @@ -331,6 +336,7 @@ public Status globList(String remotePath, List result, boolean fileN LOG.info("path pattern {}", pathPattern.toString()); PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pathPattern.toString()); + HashSet directorySet = new HashSet<>(); String listPrefix = getLongestPrefix(globPath); LOG.info("azure glob list prefix is {}", listPrefix); ListBlobsOptions options = new ListBlobsOptions().setPrefix(listPrefix); @@ -343,18 +349,32 @@ public Status globList(String remotePath, List result, boolean fileN elementCnt++; java.nio.file.Path blobPath = Paths.get(blobItem.getName()); - if (!matcher.matches(blobPath)) { - continue; + boolean isPrefix = false; + + while (true) { + if (!matcher.matches(blobPath)) { + break; + } + if (directorySet.contains(blobPath.getFileName())) { + break; + } + if (isPrefix) { + directorySet.add(blobPath.getParent().toString()); + } + + matchCnt++; + RemoteFile remoteFile = new RemoteFile( + fileNameOnly ? blobPath.getFileName().toString() : constructS3Path(blobPath.toString(), + uri.getBucket()), + !isPrefix, + isPrefix ? -1 : blobItem.getProperties().getContentLength(), + isPrefix ? -1 : blobItem.getProperties().getContentLength(), + isPrefix ? 0 : blobItem.getProperties().getLastModified().getSecond()); + result.add(remoteFile); + + blobPath = blobPath.getParent(); + isPrefix = true; } - matchCnt++; - RemoteFile remoteFile = new RemoteFile( - fileNameOnly ? blobPath.getFileName().toString() : constructS3Path(blobPath.toString(), - uri.getBucket()), - !blobItem.isPrefix(), - blobItem.isPrefix() ? -1 : blobItem.getProperties().getContentLength(), - blobItem.getProperties().getContentLength(), - blobItem.getProperties().getLastModified().getSecond()); - result.add(remoteFile); } newContinuationToken = pagedResponse.getContinuationToken(); } while (newContinuationToken != null); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java index 69a8ac4d79451b2..edcb54bf8faaed0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java @@ -102,6 +102,11 @@ protected void setProperties(Map properties) { .equalsIgnoreCase("true"); forceParsingByStandardUri = this.properties.getOrDefault(PropertyConverter.FORCE_PARSING_BY_STANDARD_URI, "false").equalsIgnoreCase("true"); + + String endpoint = this.properties.get(S3Properties.ENDPOINT); + String region = this.properties.get(S3Properties.REGION); + + this.properties.put(S3Properties.REGION, PropertyConverter.checkRegion(endpoint, region, S3Properties.REGION)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java index 5004cfd2f1219f8..c116182d3a42416 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/AzureFileSystem.java @@ -17,7 +17,6 @@ package org.apache.doris.fs.remote; -import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.StorageBackend.StorageType; import org.apache.doris.backup.Status; import org.apache.doris.common.UserException; @@ -35,13 +34,13 @@ public class AzureFileSystem extends ObjFileSystem { private static final Logger LOG = LogManager.getLogger(AzureFileSystem.class); public AzureFileSystem(Map properties) { - super(StorageType.AZURE.name(), StorageType.AZURE, new AzureObjStorage(properties)); + super(StorageType.AZURE.name(), StorageType.S3, new AzureObjStorage(properties)); initFsProperties(); } @VisibleForTesting public AzureFileSystem(AzureObjStorage storage) { - super(StorageBackend.StorageType.AZURE.name(), StorageBackend.StorageType.AZURE, storage); + super(StorageType.AZURE.name(), StorageType.S3, storage); initFsProperties(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 6132bbe64eb07d5..8a5516b76169994 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -172,6 +172,7 @@ import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalDatabase; import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable; import org.apache.doris.fs.PersistentFileSystem; +import org.apache.doris.fs.remote.AzureFileSystem; import org.apache.doris.fs.remote.BrokerFileSystem; import org.apache.doris.fs.remote.ObjFileSystem; import org.apache.doris.fs.remote.S3FileSystem; @@ -570,7 +571,8 @@ public class GsonUtils { .registerSubtype(JFSFileSystem.class, JFSFileSystem.class.getSimpleName()) .registerSubtype(OFSFileSystem.class, OFSFileSystem.class.getSimpleName()) .registerSubtype(ObjFileSystem.class, ObjFileSystem.class.getSimpleName()) - .registerSubtype(S3FileSystem.class, S3FileSystem.class.getSimpleName()); + .registerSubtype(S3FileSystem.class, S3FileSystem.class.getSimpleName()) + .registerSubtype(AzureFileSystem.class, AzureFileSystem.class.getSimpleName()); private static RuntimeTypeAdapterFactory jobBackupTypeAdapterFactory diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java index 8967ca5fae03aa9..eacd0bacbb31a85 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java @@ -195,7 +195,7 @@ public void testS3RepositoryPropertiesConverter() throws Exception { CreateRepositoryStmt analyzedStmtNew = createStmt(s3RepoNew); Assertions.assertEquals(analyzedStmtNew.getProperties().size(), 3); Repository repositoryNew = getRepository(analyzedStmtNew, "s3_repo_new"); - Assertions.assertEquals(repositoryNew.getRemoteFileSystem().getProperties().size(), 4); + Assertions.assertEquals(repositoryNew.getRemoteFileSystem().getProperties().size(), 5); } private static Repository getRepository(CreateRepositoryStmt analyzedStmt, String name) throws DdlException {