博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的BlobWriter
阅读量:5865 次
发布时间:2019-06-19

本文共 17650 字,大约阅读时间需要 58 分钟。

  hot3.png

本文主要研究一下flink的BlobWriter

BlobWriter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobWriter.java

/** * BlobWriter is used to upload data to the BLOB store. */public interface BlobWriter {	Logger LOG = LoggerFactory.getLogger(BlobWriter.class);	/**	 * Uploads the data of the given byte array for the given job to the BLOB server and makes it	 * a permanent BLOB.	 *	 * @param jobId	 * 		the ID of the job the BLOB belongs to	 * @param value	 * 		the buffer to upload	 *	 * @return the computed BLOB key identifying the BLOB on the server	 *	 * @throws IOException	 * 		thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA	 * 		store	 */	PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException;	/**	 * Uploads the data from the given input stream for the given job to the BLOB server and makes it	 * a permanent BLOB.	 *	 * @param jobId	 * 		ID of the job this blob belongs to	 * @param inputStream	 * 		the input stream to read the data from	 *	 * @return the computed BLOB key identifying the BLOB on the server	 *	 * @throws IOException	 * 		thrown if an I/O error occurs while reading the data from the input stream, writing it to a	 * 		local file, or uploading it to the HA store	 */	PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException;	/**	 * Returns the min size before data will be offloaded to the BLOB store.	 *	 * @return minimum offloading size	 */	int getMinOffloadingSize();	/**	 * Serializes the given value and offloads it to the BlobServer if its size exceeds the minimum	 * offloading size of the BlobServer.	 *	 * @param value to serialize	 * @param jobId to which the value belongs.	 * @param blobWriter to use to offload the serialized value	 * @param 
type of the value to serialize * @return Either the serialized value or the stored blob key * @throws IOException if the data cannot be serialized */ static
Either
, PermanentBlobKey> serializeAndTryOffload( T value, JobID jobId, BlobWriter blobWriter) throws IOException { Preconditions.checkNotNull(value); Preconditions.checkNotNull(jobId); Preconditions.checkNotNull(blobWriter); final SerializedValue
serializedValue = new SerializedValue<>(value); if (serializedValue.getByteArray().length < blobWriter.getMinOffloadingSize()) { return Either.Left(new SerializedValue<>(value)); } else { try { final PermanentBlobKey permanentBlobKey = blobWriter.putPermanent(jobId, serializedValue.getByteArray()); return Either.Right(permanentBlobKey); } catch (IOException e) { LOG.warn("Failed to offload value {} for job {} to BLOB store.", value, jobId, e); return Either.Left(serializedValue); } } }}
  • BlobWriter定义了putPermanent、getMinOffloadingSize方法,同时还提供了serializeAndTryOffload静态方法用于序列化指定value并在其大小超过minimum offloading size时调用blobWriter.putPermanent存放到BlobServer

BlobServer

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java

/** * This class implements the BLOB server. The BLOB server is responsible for listening for incoming requests and * spawning threads to handle these requests. Furthermore, it takes care of creating the directory structure to store * the BLOBs or temporarily cache them. */public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {	//......	@Override	public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {		checkNotNull(jobId);		return (PermanentBlobKey) putBuffer(jobId, value, PERMANENT_BLOB);	}	@Override	public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {		checkNotNull(jobId);		return (PermanentBlobKey) putInputStream(jobId, inputStream, PERMANENT_BLOB);	}	/**	 * Returns the configuration used by the BLOB server.	 *	 * @return configuration	 */	@Override	public final int getMinOffloadingSize() {		return blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE);	}	/**	 * Uploads the data of the given byte array for the given job to the BLOB server.	 *	 * @param jobId	 * 		the ID of the job the BLOB belongs to	 * @param value	 * 		the buffer to upload	 * @param blobType	 * 		whether to make the data permanent or transient	 *	 * @return the computed BLOB key identifying the BLOB on the server	 *	 * @throws IOException	 * 		thrown if an I/O error occurs while writing it to a local file, or uploading it to the HA	 * 		store	 */	private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType blobType)			throws IOException {		if (LOG.isDebugEnabled()) {			LOG.debug("Received PUT call for BLOB of job {}.", jobId);		}		File incomingFile = createTemporaryFilename();		MessageDigest md = BlobUtils.createMessageDigest();		BlobKey blobKey = null;		try (FileOutputStream fos = new FileOutputStream(incomingFile)) {			md.update(value);			fos.write(value);		} catch (IOException ioe) {			// delete incomingFile from a failed download			if (!incomingFile.delete() && incomingFile.exists()) {				LOG.warn("Could not delete the staging file {} for job {}.",					incomingFile, jobId);			}			throw ioe;		}		try {			// persist file			blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);			return blobKey;		} finally {			// delete incomingFile from a failed download			if (!incomingFile.delete() && incomingFile.exists()) {				LOG.warn("Could not delete the staging file {} for blob key {} and job {}.",					incomingFile, blobKey, jobId);			}		}	}	/**	 * Uploads the data from the given input stream for the given job to the BLOB server.	 *	 * @param jobId	 * 		the ID of the job the BLOB belongs to	 * @param inputStream	 * 		the input stream to read the data from	 * @param blobType	 * 		whether to make the data permanent or transient	 *	 * @return the computed BLOB key identifying the BLOB on the server	 *	 * @throws IOException	 * 		thrown if an I/O error occurs while reading the data from the input stream, writing it to a	 * 		local file, or uploading it to the HA store	 */	private BlobKey putInputStream(			@Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType)			throws IOException {		if (LOG.isDebugEnabled()) {			LOG.debug("Received PUT call for BLOB of job {}.", jobId);		}		File incomingFile = createTemporaryFilename();		MessageDigest md = BlobUtils.createMessageDigest();		BlobKey blobKey = null;		try (FileOutputStream fos = new FileOutputStream(incomingFile)) {			// read stream			byte[] buf = new byte[BUFFER_SIZE];			while (true) {				final int bytesRead = inputStream.read(buf);				if (bytesRead == -1) {					// done					break;				}				fos.write(buf, 0, bytesRead);				md.update(buf, 0, bytesRead);			}			// persist file			blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);			return blobKey;		} finally {			// delete incomingFile from a failed download			if (!incomingFile.delete() && incomingFile.exists()) {				LOG.warn("Could not delete the staging file {} for blob key {} and job {}.",					incomingFile, blobKey, jobId);			}		}	}	/**	 * Moves the temporary incomingFile to its permanent location where it is available for	 * use.	 *	 * @param incomingFile	 * 		temporary file created during transfer	 * @param jobId	 * 		ID of the job this blob belongs to or null if job-unrelated	 * @param digest	 * 		BLOB content digest, i.e. hash	 * @param blobType	 * 		whether this file is a permanent or transient BLOB	 *	 * @return unique BLOB key that identifies the BLOB on the server	 *	 * @throws IOException	 * 		thrown if an I/O error occurs while moving the file or uploading it to the HA store	 */	BlobKey moveTempFileToStore(			File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType)			throws IOException {		int retries = 10;		int attempt = 0;		while (true) {			// add unique component independent of the BLOB content			BlobKey blobKey = BlobKey.createKey(blobType, digest);			File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey);			// try again until the key is unique (put the existence check into the lock!)			readWriteLock.writeLock().lock();			try {				if (!storageFile.exists()) {					BlobUtils.moveTempFileToStore(						incomingFile, jobId, blobKey, storageFile, LOG,						blobKey instanceof PermanentBlobKey ? blobStore : null);					// add TTL for transient BLOBs:					if (blobKey instanceof TransientBlobKey) {						// must be inside read or write lock to add a TTL						blobExpiryTimes							.put(Tuple2.of(jobId, (TransientBlobKey) blobKey),								System.currentTimeMillis() + cleanupInterval);					}					return blobKey;				}			} finally {				readWriteLock.writeLock().unlock();			}			++attempt;			if (attempt >= retries) {				String message = "Failed to find a unique key for BLOB of job " + jobId + " (last tried " + storageFile.getAbsolutePath() + ".";				LOG.error(message + " No retries left.");				throw new IOException(message);			} else {				if (LOG.isDebugEnabled()) {					LOG.debug("Trying to find a unique key for BLOB of job {} (retry {}, last tried {})",						jobId, attempt, storageFile.getAbsolutePath());				}			}		}	}	/**	 * Returns a temporary file inside the BLOB server's incoming directory.	 *	 * @return a temporary file inside the BLOB server's incoming directory	 *	 * @throws IOException	 * 		if creating the directory fails	 */	File createTemporaryFilename() throws IOException {		return new File(BlobUtils.getIncomingDirectory(storageDir),				String.format("temp-%08d", tempFileCounter.getAndIncrement()));	}	//......}
  • BlobServer实现了BlobWriter接口,putPermanent方法分别用到了putBuffer及putInputStream方法,而getMinOffloadingSize方法则从blobServiceConfiguration获取BlobServerOptions.OFFLOAD_MINSIZE配置,默认是1M
  • putBuffer方法接收byte[]参数,它先把byte[]写入到临时文件,之后调用moveTempFileToStore方法进行持久化;putInputStream方法接收InputStream参数,它也是先把InputStream写入到临时文件,然后调用moveTempFileToStore方法进行持久化
  • moveTempFileToStore方法调用了BlobUtils.moveTempFileToStore将本地临时文件转移到permanent location;其中storageDir由BlobUtils.initLocalStorageDirectory(config)来初始化,而storageFile通过BlobUtils.getStorageLocation(storageDir, jobId, blobKey)来获取

BlobUtils

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java

/** * Utility class to work with blob data. */public class BlobUtils {	//......	/**	 * Creates a local storage directory for a blob service under the configuration parameter given	 * by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is null or empty, we will	 * fall back to Flink's temp directories (given by	 * {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}) and choose one among them at	 * random.	 *	 * @param config	 * 		Flink configuration	 *	 * @return a new local storage directory	 *	 * @throws IOException	 * 		thrown if the local file storage cannot be created or is not usable	 */	static File initLocalStorageDirectory(Configuration config) throws IOException {		String basePath = config.getString(BlobServerOptions.STORAGE_DIRECTORY);		File baseDir;		if (StringUtils.isNullOrWhitespaceOnly(basePath)) {			final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(config);			baseDir = new File(tmpDirPaths[RANDOM.nextInt(tmpDirPaths.length)]);		}		else {			baseDir = new File(basePath);		}		File storageDir;		// NOTE: although we will be using UUIDs, there may be collisions		int maxAttempts = 10;		for (int attempt = 0; attempt < maxAttempts; attempt++) {			storageDir = new File(baseDir, String.format(					"blobStore-%s", UUID.randomUUID().toString()));			// Create the storage dir if it doesn't exist. Only return it when the operation was			// successful.			if (storageDir.mkdirs()) {				return storageDir;			}		}		// max attempts exceeded to find a storage directory		throw new IOException("Could not create storage directory for BLOB store in '" + baseDir + "'.");	}	/**	 * Returns the (designated) physical storage location of the BLOB with the given key.	 *	 * @param storageDir	 * 		storage directory used be the BLOB service	 * @param key	 * 		the key identifying the BLOB	 * @param jobId	 * 		ID of the job for the incoming files (or null if job-unrelated)	 *	 * @return the (designated) physical storage location of the BLOB	 *	 * @throws IOException	 * 		if creating the directory fails	 */	static File getStorageLocation(			File storageDir, @Nullable JobID jobId, BlobKey key) throws IOException {		File file = new File(getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key));		Files.createDirectories(file.getParentFile().toPath());		return file;	}	/**	 * Returns the path for the given blob key.	 *	 * 

The returned path can be used with the (local or HA) BLOB store file system back-end for * recovery purposes and follows the same scheme as {@link #getStorageLocation(File, JobID, * BlobKey)}. * * @param storageDir * storage directory used be the BLOB service * @param key * the key identifying the BLOB * @param jobId * ID of the job for the incoming files * * @return the path to the given BLOB */ static String getStorageLocationPath( String storageDir, @Nullable JobID jobId, BlobKey key) { if (jobId == null) { // format: $base/no_job/blob_$key return String.format("%s/%s/%s%s", storageDir, NO_JOB_DIR_PREFIX, BLOB_FILE_PREFIX, key.toString()); } else { // format: $base/job_$jobId/blob_$key return String.format("%s/%s%s/%s%s", storageDir, JOB_DIR_PREFIX, jobId.toString(), BLOB_FILE_PREFIX, key.toString()); } } /** * Moves the temporary incomingFile to its permanent location where it is available for * use (not thread-safe!). * * @param incomingFile * temporary file created during transfer * @param jobId * ID of the job this blob belongs to or null if job-unrelated * @param blobKey * BLOB key identifying the file * @param storageFile * (local) file where the blob is/should be stored * @param log * logger for debug information * @param blobStore * HA store (or null if unavailable) * * @throws IOException * thrown if an I/O error occurs while moving the file or uploading it to the HA store */ static void moveTempFileToStore( File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile, Logger log, @Nullable BlobStore blobStore) throws IOException { try { // first check whether the file already exists if (!storageFile.exists()) { try { // only move the file if it does not yet exist Files.move(incomingFile.toPath(), storageFile.toPath()); incomingFile = null; } catch (FileAlreadyExistsException ignored) { log.warn("Detected concurrent file modifications. This should only happen if multiple" + "BlobServer use the same storage directory."); // we cannot be sure at this point whether the file has already been uploaded to the blob // store or not. Even if the blobStore might shortly be in an inconsistent state, we have // to persist the blob. Otherwise we might not be able to recover the job. } if (blobStore != null) { // only the one moving the incoming file to its final destination is allowed to upload the // file to the blob store blobStore.put(storageFile, jobId, blobKey); } } else { log.warn("File upload for an existing file with key {} for job {}. This may indicate a duplicate upload or a hash collision. Ignoring newest upload.", blobKey, jobId); } storageFile = null; } finally { // we failed to either create the local storage file or to upload it --> try to delete the local file // while still having the write lock if (storageFile != null && !storageFile.delete() && storageFile.exists()) { log.warn("Could not delete the storage file {}.", storageFile); } if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) { log.warn("Could not delete the staging file {} for blob key {} and job {}.", incomingFile, blobKey, jobId); } } } //......}

  • initLocalStorageDirectory方法从配置文件读取BlobServerOptions.STORAGE_DIRECTORY配置(blob.storage.directory),如果没有配置,则通过ConfigurationUtils.parseTempDirectories来获取tmpDirPaths,然后随机选一个作为baseDir,而storageDir目录则是baseDir的子目录,其目录名前缀为blobStore
  • getStorageLocation方法则在storageDir的基础上根据JobID及BlobKey构造具体的存储路径,其格式为$base/no_job/blob_$key或者$base/job_$jobId/blob_$key
  • moveTempFileToStore方法则在目标文件不存在的场景下使用Files.move将incomingFile转移到storageFile,如果blobStore不为null,还会将storageFile放入到BlobStore

小结

  • BlobWriter定义了putPermanent、getMinOffloadingSize方法,同时还提供了serializeAndTryOffload静态方法用于序列化指定value并在其大小超过minimum offloading size时调用blobWriter.putPermanent存放到BlobServer
  • BlobServer实现了BlobWriter接口,putPermanent方法分别用到了putBuffer及putInputStream方法,而getMinOffloadingSize方法则从blobServiceConfiguration获取BlobServerOptions.OFFLOAD_MINSIZE配置,默认是1M;putBuffer方法接收byte[]参数,它先把byte[]写入到临时文件,之后调用moveTempFileToStore方法进行持久化;putInputStream方法接收InputStream参数,它也是先把InputStream写入到临时文件,然后调用moveTempFileToStore方法进行持久化;moveTempFileToStore方法调用了BlobUtils.moveTempFileToStore将本地临时文件转移到permanent location;其中storageDir由BlobUtils.initLocalStorageDirectory(config)来初始化,而storageFile通过BlobUtils.getStorageLocation(storageDir, jobId, blobKey)来获取
  • BlobUtils的initLocalStorageDirectory方法从配置文件读取BlobServerOptions.STORAGE_DIRECTORY配置(blob.storage.directory),如果没有配置,则通过ConfigurationUtils.parseTempDirectories来获取tmpDirPaths,然后随机选一个作为baseDir,而storageDir目录则是baseDir的子目录,其目录名前缀为blobStore;getStorageLocation方法则在storageDir的基础上根据JobID及BlobKey构造具体的存储路径,其格式为$base/no_job/blob_$key或者$base/job_$jobId/blob_$key;moveTempFileToStore方法则在目标文件不存在的场景下使用Files.move将incomingFile转移到storageFile,如果blobStore不为null,还会将storageFile放入到BlobStore

doc

转载于:https://my.oschina.net/go4it/blog/3015971

你可能感兴趣的文章
php框架Laravel 5 中文文档 gitbook版
查看>>
关于在google浏览器与firefox、ie下:line-height 单位讨论?
查看>>
TextInputLayout简单说使用
查看>>
sgu 170
查看>>
Oracle PL/SQL常用函数列表
查看>>
MyBaties自动生成表结构
查看>>
微信开发
查看>>
SSD HHD机械硬盘 MaxIO
查看>>
Nutch2.1+mysql+solr3.6.1安装部署
查看>>
cannot declare have static linkage
查看>>
中文域名在线转码
查看>>
论双1的重要性
查看>>
Ubuntu 12.04使用MySQL
查看>>
windows server 2008的安装
查看>>
PHP数组和Json之间的转换
查看>>
[css]-flex布局
查看>>
一次FULL GC问题的排查
查看>>
gerrit与crowdid, openid集成,设置openIdSsoUrl 直接登录
查看>>
一个账号同时只能在同一个设备上登陆
查看>>
Java学习资料-Java类加载器
查看>>