Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[fix] let backup work on azure
Browse files Browse the repository at this point in the history
1. use https for azure
2. region is unnecessary for gcp.
3. fe pass provider to be
4. be listblob endless loop
g. fe globlist use hierarchy api
g. azure type should be s3 in fe
7. add azure file system adaptor in gson
Yongqiang YANG committed Jan 4, 2025
1 parent 8f87a96 commit 399449a
Showing 9 changed files with 49 additions and 19 deletions.
2 changes: 1 addition & 1 deletion be/src/io/fs/azure_obj_storage_client.cpp
Original file line number Diff line number Diff line change
@@ -311,7 +311,7 @@ ObjectStorageResponse AzureObjStorageClient::list_objects(const ObjectStoragePat
return _client->ListBlobs(list_opts);
});
get_file_file(resp);
while (!resp.NextPageToken->empty()) {
while (resp.NextPageToken.HasValue()) {
list_opts.ContinuationToken = resp.NextPageToken;
resp = s3_get_rate_limit([&]() {
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
4 changes: 2 additions & 2 deletions be/src/util/s3_util.cpp
Original file line number Diff line number Diff line change
@@ -254,8 +254,8 @@ std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_azure_client(
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(s3_conf.ak, s3_conf.sk);

const std::string container_name = s3_conf.bucket;
const std::string uri = fmt::format("{}://{}.blob.core.windows.net/{}",
config::s3_client_http_scheme, s3_conf.ak, container_name);
const std::string uri =
fmt::format("{}://{}.blob.core.windows.net/{}", "https", s3_conf.ak, container_name);

auto containerClient = std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri, cred);
LOG_INFO("create one azure client with {}", s3_conf.to_string());
Original file line number Diff line number Diff line change
@@ -178,6 +178,7 @@ public TStorageBackendType toThrift() {
return TStorageBackendType.JFS;
case LOCAL:
return TStorageBackendType.LOCAL;
// deprecated
case AZURE:
return TStorageBackendType.AZURE;
default:
Original file line number Diff line number Diff line change
@@ -85,6 +85,9 @@ private static Map<String, String> getBeAWSPropertiesFromS3(Map<String, String>
if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
beProperties.put(PropertyConverter.USE_PATH_STYLE, properties.get(PropertyConverter.USE_PATH_STYLE));
}
if (properties.containsKey(S3Properties.PROVIDER)) {
beProperties.put(S3Properties.PROVIDER, properties.get(S3Properties.PROVIDER));
}
return beProperties;
}
}
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
import com.azure.storage.blob.batch.BlobBatch;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.models.BlobStorageException;
@@ -53,6 +54,7 @@
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -196,6 +198,9 @@ public Status deleteObject(String remotePath) {
LOG.info("delete file " + remotePath + " success");
return Status.OK;
} catch (BlobStorageException e) {
if (e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) {
return Status.OK;
}
return new Status(
Status.ErrCode.COMMON_ERROR,
"get file from azure error: " + e.getServiceMessage());
@@ -331,6 +336,7 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
LOG.info("path pattern {}", pathPattern.toString());
PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pathPattern.toString());

HashSet<String> directorySet = new HashSet<>();
String listPrefix = getLongestPrefix(globPath);
LOG.info("azure glob list prefix is {}", listPrefix);
ListBlobsOptions options = new ListBlobsOptions().setPrefix(listPrefix);
@@ -343,18 +349,32 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
elementCnt++;
java.nio.file.Path blobPath = Paths.get(blobItem.getName());

if (!matcher.matches(blobPath)) {
continue;
boolean isPrefix = false;

while (true) {
if (!matcher.matches(blobPath)) {
break;
}
if (directorySet.contains(blobPath.getFileName())) {
break;
}
if (isPrefix) {
directorySet.add(blobPath.getParent().toString());
}

matchCnt++;
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? blobPath.getFileName().toString() : constructS3Path(blobPath.toString(),
uri.getBucket()),
!isPrefix,
isPrefix ? -1 : blobItem.getProperties().getContentLength(),
isPrefix ? -1 : blobItem.getProperties().getContentLength(),
isPrefix ? 0 : blobItem.getProperties().getLastModified().getSecond());
result.add(remoteFile);

blobPath = blobPath.getParent();
isPrefix = true;
}
matchCnt++;
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? blobPath.getFileName().toString() : constructS3Path(blobPath.toString(),
uri.getBucket()),
!blobItem.isPrefix(),
blobItem.isPrefix() ? -1 : blobItem.getProperties().getContentLength(),
blobItem.getProperties().getContentLength(),
blobItem.getProperties().getLastModified().getSecond());
result.add(remoteFile);
}
newContinuationToken = pagedResponse.getContinuationToken();
} while (newContinuationToken != null);
Original file line number Diff line number Diff line change
@@ -102,6 +102,11 @@ protected void setProperties(Map<String, String> properties) {
.equalsIgnoreCase("true");
forceParsingByStandardUri = this.properties.getOrDefault(PropertyConverter.FORCE_PARSING_BY_STANDARD_URI,
"false").equalsIgnoreCase("true");

String endpoint = this.properties.get(S3Properties.ENDPOINT);
String region = this.properties.get(S3Properties.REGION);

this.properties.put(S3Properties.REGION, PropertyConverter.checkRegion(endpoint, region, S3Properties.REGION));
}

@Override
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@

package org.apache.doris.fs.remote;

import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
@@ -35,13 +34,13 @@ public class AzureFileSystem extends ObjFileSystem {
private static final Logger LOG = LogManager.getLogger(AzureFileSystem.class);

public AzureFileSystem(Map<String, String> properties) {
super(StorageType.AZURE.name(), StorageType.AZURE, new AzureObjStorage(properties));
super(StorageType.AZURE.name(), StorageType.S3, new AzureObjStorage(properties));
initFsProperties();
}

@VisibleForTesting
public AzureFileSystem(AzureObjStorage storage) {
super(StorageBackend.StorageType.AZURE.name(), StorageBackend.StorageType.AZURE, storage);
super(StorageType.AZURE.name(), StorageType.S3, storage);
initFsProperties();
}

Original file line number Diff line number Diff line change
@@ -172,6 +172,7 @@
import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalDatabase;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.fs.remote.AzureFileSystem;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.ObjFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;
@@ -570,7 +571,8 @@ public class GsonUtils {
.registerSubtype(JFSFileSystem.class, JFSFileSystem.class.getSimpleName())
.registerSubtype(OFSFileSystem.class, OFSFileSystem.class.getSimpleName())
.registerSubtype(ObjFileSystem.class, ObjFileSystem.class.getSimpleName())
.registerSubtype(S3FileSystem.class, S3FileSystem.class.getSimpleName());
.registerSubtype(S3FileSystem.class, S3FileSystem.class.getSimpleName())
.registerSubtype(AzureFileSystem.class, AzureFileSystem.class.getSimpleName());

private static RuntimeTypeAdapterFactory<org.apache.doris.backup.AbstractJob>
jobBackupTypeAdapterFactory
Original file line number Diff line number Diff line change
@@ -195,7 +195,7 @@ public void testS3RepositoryPropertiesConverter() throws Exception {
CreateRepositoryStmt analyzedStmtNew = createStmt(s3RepoNew);
Assertions.assertEquals(analyzedStmtNew.getProperties().size(), 3);
Repository repositoryNew = getRepository(analyzedStmtNew, "s3_repo_new");
Assertions.assertEquals(repositoryNew.getRemoteFileSystem().getProperties().size(), 4);
Assertions.assertEquals(repositoryNew.getRemoteFileSystem().getProperties().size(), 5);
}

private static Repository getRepository(CreateRepositoryStmt analyzedStmt, String name) throws DdlException {

0 comments on commit 399449a

Please sign in to comment.