序
本文主要研究一下flink的BlobServer
BlobServer
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService { /** The log object used for debugging. */ private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class); /** Counter to generate unique names for temporary files. */ private final AtomicLong tempFileCounter = new AtomicLong(0); /** The server socket listening for incoming connections. */ private final ServerSocket serverSocket; /** Blob Server configuration. */ private final Configuration blobServiceConfiguration; /** Indicates whether a shutdown of server component has been requested. */ private final AtomicBoolean shutdownRequested = new AtomicBoolean(); /** Root directory for local file storage. */ private final File storageDir; /** Blob store for distributed file storage, e.g. in HA. */ private final BlobStore blobStore; /** Set of currently running threads. */ private final SetactiveConnections = new HashSet<>(); /** The maximum number of concurrent connections. */ private final int maxConnections; /** Lock guarding concurrent file accesses. */ private final ReadWriteLock readWriteLock; /** * Shutdown hook thread to ensure deletion of the local storage directory. */ private final Thread shutdownHook; // -------------------------------------------------------------------------------------------- /** * Map to store the TTL of each element stored in the local storage, i.e. via one of the {@link * #getFile} methods. **/ private final ConcurrentHashMap , Long> blobExpiryTimes = new ConcurrentHashMap<>(); /** Time interval (ms) to run the cleanup task; also used as the default TTL. */ private final long cleanupInterval; /** * Timer task to execute the cleanup at regular intervals. */ private final Timer cleanupTimer; /** * Instantiates a new BLOB server and binds it to a free network port. * * @param config Configuration to be used to instantiate the BlobServer * @param blobStore BlobStore to store blobs persistently * * @throws IOException * thrown if the BLOB server cannot bind to a free network port or if the * (local or distributed) file storage cannot be created or is not usable */ public BlobServer(Configuration config, BlobStore blobStore) throws IOException { this.blobServiceConfiguration = checkNotNull(config); this.blobStore = checkNotNull(blobStore); this.readWriteLock = new ReentrantReadWriteLock(); // configure and create the storage directory this.storageDir = BlobUtils.initLocalStorageDirectory(config); LOG.info("Created BLOB server storage directory {}", storageDir); // configure the maximum number of concurrent connections final int maxConnections = config.getInteger(BlobServerOptions.FETCH_CONCURRENT); if (maxConnections >= 1) { this.maxConnections = maxConnections; } else { LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}", maxConnections, BlobServerOptions.FETCH_CONCURRENT.defaultValue()); this.maxConnections = BlobServerOptions.FETCH_CONCURRENT.defaultValue(); } // configure the backlog of connections int backlog = config.getInteger(BlobServerOptions.FETCH_BACKLOG); if (backlog < 1) { LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}", backlog, BlobServerOptions.FETCH_BACKLOG.defaultValue()); backlog = BlobServerOptions.FETCH_BACKLOG.defaultValue(); } // Initializing the clean up task this.cleanupTimer = new Timer(true); this.cleanupInterval = config.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000; this.cleanupTimer .schedule(new TransientBlobCleanupTask(blobExpiryTimes, readWriteLock.writeLock(), storageDir, LOG), cleanupInterval, cleanupInterval); this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG); // ----------------------- start the server ------------------- final String serverPortRange = config.getString(BlobServerOptions.PORT); final Iterator ports = NetUtils.getPortRangeFromString(serverPortRange); final ServerSocketFactory socketFactory; if (SSLUtils.isInternalSSLEnabled(config) && config.getBoolean(BlobServerOptions.SSL_ENABLED)) { try { socketFactory = SSLUtils.createSSLServerSocketFactory(config); } catch (Exception e) { throw new IOException("Failed to initialize SSL for the blob server", e); } } else { socketFactory = ServerSocketFactory.getDefault(); } final int finalBacklog = backlog; this.serverSocket = NetUtils.createSocketFromPorts(ports, (port) -> socketFactory.createServerSocket(port, finalBacklog)); if (serverSocket == null) { throw new IOException("Unable to open BLOB Server in specified port range: " + serverPortRange); } // start the server thread setName("BLOB Server listener at " + getPort()); setDaemon(true); if (LOG.isInfoEnabled()) { LOG.info("Started BLOB server at {}:{} - max concurrent requests: {} - max backlog: {}", serverSocket.getInetAddress().getHostAddress(), getPort(), maxConnections, backlog); } } //...... @Override public void run() { try { while (!this.shutdownRequested.get()) { BlobServerConnection conn = new BlobServerConnection(serverSocket.accept(), this); try { synchronized (activeConnections) { while (activeConnections.size() >= maxConnections) { activeConnections.wait(2000); } activeConnections.add(conn); } conn.start(); conn = null; } finally { if (conn != null) { conn.close(); synchronized (activeConnections) { activeConnections.remove(conn); } } } } } catch (Throwable t) { if (!this.shutdownRequested.get()) { LOG.error("BLOB server stopped working. Shutting down", t); try { close(); } catch (Throwable closeThrowable) { LOG.error("Could not properly close the BlobServer.", closeThrowable); } } } } /** * Shuts down the BLOB server. */ @Override public void close() throws IOException { cleanupTimer.cancel(); if (shutdownRequested.compareAndSet(false, true)) { Exception exception = null; try { this.serverSocket.close(); } catch (IOException ioe) { exception = ioe; } // wake the thread up, in case it is waiting on some operation interrupt(); try { join(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); LOG.debug("Error while waiting for this thread to die.", ie); } synchronized (activeConnections) { if (!activeConnections.isEmpty()) { for (BlobServerConnection conn : activeConnections) { LOG.debug("Shutting down connection {}.", conn.getName()); conn.close(); } activeConnections.clear(); } } // Clean up the storage directory try { FileUtils.deleteDirectory(storageDir); } catch (IOException e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } // Remove shutdown hook to prevent resource leaks ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); if (LOG.isInfoEnabled()) { LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort()); } ExceptionUtils.tryRethrowIOException(exception); } } //......}
- BlobServer继承了Thread,同时实现了BlobService、BlobWriter、PermanentBlobService、TransientBlobService接口
- 其构造器使用DefaultServerSocketFactory创建了ServerSocket,同时使用ShutdownHookUtil.addShutdownHook注册了shutdownHook,在shutdown的时候会调用close方法
- 重写了Thread的run方法,该方法在没有接收到shutdown请求的时候,会不断循环等待serverSocket.accept(),然后创建BlobServerConnection,如果当前activeConnections超过了maxConnections则会不断循环等待2000毫秒,之后将连接维护到activeConnections,然后调用conn.start()
BlobServerConnection
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
class BlobServerConnection extends Thread { /** The log object used for debugging. */ private static final Logger LOG = LoggerFactory.getLogger(BlobServerConnection.class); /** The socket to communicate with the client. */ private final Socket clientSocket; /** The BLOB server. */ private final BlobServer blobServer; /** Read lock to synchronize file accesses. */ private final Lock readLock; BlobServerConnection(Socket clientSocket, BlobServer blobServer) { super("BLOB connection for " + clientSocket.getRemoteSocketAddress()); setDaemon(true); this.clientSocket = clientSocket; this.blobServer = checkNotNull(blobServer); ReadWriteLock readWriteLock = blobServer.getReadWriteLock(); this.readLock = readWriteLock.readLock(); } // -------------------------------------------------------------------------------------------- // Connection / Thread methods // -------------------------------------------------------------------------------------------- /** * Main connection work method. Accepts requests until the other side closes the connection. */ @Override public void run() { try { final InputStream inputStream = this.clientSocket.getInputStream(); final OutputStream outputStream = this.clientSocket.getOutputStream(); while (true) { // Read the requested operation final int operation = inputStream.read(); if (operation < 0) { // done, no one is asking anything from us return; } switch (operation) { case PUT_OPERATION: put(inputStream, outputStream, new byte[BUFFER_SIZE]); break; case GET_OPERATION: get(inputStream, outputStream, new byte[BUFFER_SIZE]); break; default: throw new IOException("Unknown operation " + operation); } } } catch (SocketException e) { // this happens when the remote site closes the connection LOG.debug("Socket connection closed", e); } catch (Throwable t) { LOG.error("Error while executing BLOB connection.", t); } finally { closeSilently(clientSocket, LOG); blobServer.unregisterConnection(this); } } /** * Closes the connection socket and lets the thread exit. */ public void close() { closeSilently(clientSocket, LOG); interrupt(); } // -------------------------------------------------------------------------------------------- // Actions // -------------------------------------------------------------------------------------------- private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { /* * Retrieve the file from the (distributed?) BLOB store and store it * locally, then send it to the service which requested it. * * Instead, we could send it from the distributed store directly but * chances are high that if there is one request, there will be more * so a local cache makes more sense. */ final File blobFile; final JobID jobId; final BlobKey blobKey; try { // read HEADER contents: job ID, key, HA mode/permanent or transient BLOB final int mode = inputStream.read(); if (mode < 0) { throw new EOFException("Premature end of GET request"); } // Receive the jobId and key if (mode == JOB_UNRELATED_CONTENT) { jobId = null; } else if (mode == JOB_RELATED_CONTENT) { byte[] jidBytes = new byte[JobID.SIZE]; readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); jobId = JobID.fromByteArray(jidBytes); } else { throw new IOException("Unknown type of BLOB addressing: " + mode + '.'); } blobKey = BlobKey.readFromInputStream(inputStream); checkArgument(blobKey instanceof TransientBlobKey || jobId != null, "Invalid BLOB addressing for permanent BLOBs"); if (LOG.isDebugEnabled()) { LOG.debug("Received GET request for BLOB {}/{} from {}.", jobId, blobKey, clientSocket.getInetAddress()); } // the file's (destined) location at the BlobServer blobFile = blobServer.getStorageLocation(jobId, blobKey); // up to here, an error can give a good message } catch (Throwable t) { LOG.error("GET operation from {} failed.", clientSocket.getInetAddress(), t); try { writeErrorToStream(outputStream, t); } catch (IOException e) { // since we are in an exception case, it means that we could not send the error // ignore this } clientSocket.close(); return; } try { readLock.lock(); try { // copy the file to local store if it does not exist yet try { blobServer.getFileInternal(jobId, blobKey, blobFile); // enforce a 2GB max for now (otherwise the protocol's length field needs to be increased) if (blobFile.length() > Integer.MAX_VALUE) { throw new IOException("BLOB size exceeds the maximum size (2 GB)."); } outputStream.write(RETURN_OKAY); } catch (Throwable t) { LOG.error("GET operation failed for BLOB {}/{} from {}.", jobId, blobKey, clientSocket.getInetAddress(), t); try { writeErrorToStream(outputStream, t); } catch (IOException e) { // since we are in an exception case, it means that we could not send the error // ignore this } clientSocket.close(); return; } // from here on, we started sending data, so all we can do is close the connection when something happens int blobLen = (int) blobFile.length(); writeLength(blobLen, outputStream); try (FileInputStream fis = new FileInputStream(blobFile)) { int bytesRemaining = blobLen; while (bytesRemaining > 0) { int read = fis.read(buf); if (read < 0) { throw new IOException("Premature end of BLOB file stream for " + blobFile.getAbsolutePath()); } outputStream.write(buf, 0, read); bytesRemaining -= read; } } } finally { readLock.unlock(); } // on successful transfer, delete transient files int result = inputStream.read(); if (result < 0) { throw new EOFException("Premature end of GET request"); } else if (blobKey instanceof TransientBlobKey && result == RETURN_OKAY) { // ignore the result from the operation if (!blobServer.deleteInternal(jobId, (TransientBlobKey) blobKey)) { LOG.warn("DELETE operation failed for BLOB {}/{} from {}.", jobId, blobKey, clientSocket.getInetAddress()); } } } catch (SocketException e) { // happens when the other side disconnects LOG.debug("Socket connection closed", e); } catch (Throwable t) { LOG.error("GET operation failed", t); clientSocket.close(); } } private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException { File incomingFile = null; try { // read HEADER contents: job ID, HA mode/permanent or transient BLOB final int mode = inputStream.read(); if (mode < 0) { throw new EOFException("Premature end of PUT request"); } final JobID jobId; if (mode == JOB_UNRELATED_CONTENT) { jobId = null; } else if (mode == JOB_RELATED_CONTENT) { byte[] jidBytes = new byte[JobID.SIZE]; readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID"); jobId = JobID.fromByteArray(jidBytes); } else { throw new IOException("Unknown type of BLOB addressing."); } final BlobKey.BlobType blobType; { final int read = inputStream.read(); if (read < 0) { throw new EOFException("Read an incomplete BLOB type"); } else if (read == TRANSIENT_BLOB.ordinal()) { blobType = TRANSIENT_BLOB; } else if (read == PERMANENT_BLOB.ordinal()) { blobType = PERMANENT_BLOB; checkArgument(jobId != null, "Invalid BLOB addressing for permanent BLOBs"); } else { throw new IOException("Invalid data received for the BLOB type: " + read); } } if (LOG.isDebugEnabled()) { LOG.debug("Received PUT request for BLOB of job {} with from {}.", jobId, clientSocket.getInetAddress()); } incomingFile = blobServer.createTemporaryFilename(); byte[] digest = readFileFully(inputStream, incomingFile, buf); BlobKey blobKey = blobServer.moveTempFileToStore(incomingFile, jobId, digest, blobType); // Return computed key to client for validation outputStream.write(RETURN_OKAY); blobKey.writeToOutputStream(outputStream); } catch (SocketException e) { // happens when the other side disconnects LOG.debug("Socket connection closed", e); } catch (Throwable t) { LOG.error("PUT operation failed", t); try { writeErrorToStream(outputStream, t); } catch (IOException e) { // since we are in an exception case, it means not much that we could not send the error // ignore this } clientSocket.close(); } finally { if (incomingFile != null) { if (!incomingFile.delete() && incomingFile.exists()) { LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath()); } } } } private static byte[] readFileFully( final InputStream inputStream, final File incomingFile, final byte[] buf) throws IOException { MessageDigest md = BlobUtils.createMessageDigest(); try (FileOutputStream fos = new FileOutputStream(incomingFile)) { while (true) { final int bytesExpected = readLength(inputStream); if (bytesExpected == -1) { // done break; } if (bytesExpected > BUFFER_SIZE) { throw new IOException( "Unexpected number of incoming bytes: " + bytesExpected); } readFully(inputStream, buf, 0, bytesExpected, "buffer"); fos.write(buf, 0, bytesExpected); md.update(buf, 0, bytesExpected); } return md.digest(); } } // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- private static void writeErrorToStream(OutputStream out, Throwable t) throws IOException { byte[] bytes = InstantiationUtil.serializeObject(t); out.write(RETURN_ERROR); writeLength(bytes.length, out); out.write(bytes); }}
- BlobServerConnection继承了Thread,它的构造器接收clientSocket及blobServer;它覆盖了Thread的run方法,该方法首先从clientSocket读取请求的operation,如果是PUT_OPERATION则调用put方法,如果是GET_OPERATION则调用get方法
- put方法从inputStream读取jobId及blobType,之后创建incomingFile,将输入的文件先存储到临时文件,然后调用blobServer.moveTempFileToStore方法存储到blob server
- get方法从inputStream读取jobId及blobType,之后调用blobServer.getStorageLocation获取blobFile,之后将其拷贝到local store,然后写入到outputStream
小结
- BlobServer继承了Thread,同时实现了BlobService、BlobWriter、PermanentBlobService、TransientBlobService接口;其构造器使用DefaultServerSocketFactory创建了ServerSocket,同时使用ShutdownHookUtil.addShutdownHook注册了shutdownHook,在shutdown的时候会调用close方法
- BlobServer重写了Thread的run方法,该方法在没有接收到shutdown请求的时候,会不断循环等待serverSocket.accept(),然后创建BlobServerConnection,如果当前activeConnections超过了maxConnections则会不断循环等待2000毫秒,之后将连接维护到activeConnections,然后调用conn.start()
- BlobServerConnection继承了Thread,它的构造器接收clientSocket及blobServer;它覆盖了Thread的run方法,该方法首先从clientSocket读取请求的operation,如果是PUT_OPERATION则调用put方法,如果是GET_OPERATION则调用get方法;put方法从inputStream读取jobId及blobType,之后创建incomingFile,将输入的文件先存储到临时文件,然后调用blobServer.moveTempFileToStore方法存储到blob server;get方法从inputStream读取jobId及blobType,之后调用blobServer.getStorageLocation获取blobFile,之后将其拷贝到local store,然后写入到outputStream