博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的BlobServer
阅读量:5789 次
发布时间:2019-06-18

本文共 22949 字,大约阅读时间需要 76 分钟。

本文主要研究一下flink的BlobServer

BlobServer

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java

public class BlobServer extends Thread implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService {    /** The log object used for debugging. */    private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);    /** Counter to generate unique names for temporary files. */    private final AtomicLong tempFileCounter = new AtomicLong(0);    /** The server socket listening for incoming connections. */    private final ServerSocket serverSocket;    /** Blob Server configuration. */    private final Configuration blobServiceConfiguration;    /** Indicates whether a shutdown of server component has been requested. */    private final AtomicBoolean shutdownRequested = new AtomicBoolean();    /** Root directory for local file storage. */    private final File storageDir;    /** Blob store for distributed file storage, e.g. in HA. */    private final BlobStore blobStore;    /** Set of currently running threads. */    private final Set
activeConnections = new HashSet<>(); /** The maximum number of concurrent connections. */ private final int maxConnections; /** Lock guarding concurrent file accesses. */ private final ReadWriteLock readWriteLock; /** * Shutdown hook thread to ensure deletion of the local storage directory. */ private final Thread shutdownHook; // -------------------------------------------------------------------------------------------- /** * Map to store the TTL of each element stored in the local storage, i.e. via one of the {@link * #getFile} methods. **/ private final ConcurrentHashMap
, Long> blobExpiryTimes = new ConcurrentHashMap<>(); /** Time interval (ms) to run the cleanup task; also used as the default TTL. */ private final long cleanupInterval; /** * Timer task to execute the cleanup at regular intervals. */ private final Timer cleanupTimer; /** * Instantiates a new BLOB server and binds it to a free network port. * * @param config Configuration to be used to instantiate the BlobServer * @param blobStore BlobStore to store blobs persistently * * @throws IOException * thrown if the BLOB server cannot bind to a free network port or if the * (local or distributed) file storage cannot be created or is not usable */ public BlobServer(Configuration config, BlobStore blobStore) throws IOException { this.blobServiceConfiguration = checkNotNull(config); this.blobStore = checkNotNull(blobStore); this.readWriteLock = new ReentrantReadWriteLock(); // configure and create the storage directory this.storageDir = BlobUtils.initLocalStorageDirectory(config); LOG.info("Created BLOB server storage directory {}", storageDir); // configure the maximum number of concurrent connections final int maxConnections = config.getInteger(BlobServerOptions.FETCH_CONCURRENT); if (maxConnections >= 1) { this.maxConnections = maxConnections; } else { LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}", maxConnections, BlobServerOptions.FETCH_CONCURRENT.defaultValue()); this.maxConnections = BlobServerOptions.FETCH_CONCURRENT.defaultValue(); } // configure the backlog of connections int backlog = config.getInteger(BlobServerOptions.FETCH_BACKLOG); if (backlog < 1) { LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}", backlog, BlobServerOptions.FETCH_BACKLOG.defaultValue()); backlog = BlobServerOptions.FETCH_BACKLOG.defaultValue(); } // Initializing the clean up task this.cleanupTimer = new Timer(true); this.cleanupInterval = config.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000; this.cleanupTimer .schedule(new TransientBlobCleanupTask(blobExpiryTimes, readWriteLock.writeLock(), storageDir, LOG), cleanupInterval, cleanupInterval); this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG); // ----------------------- start the server ------------------- final String serverPortRange = config.getString(BlobServerOptions.PORT); final Iterator
ports = NetUtils.getPortRangeFromString(serverPortRange); final ServerSocketFactory socketFactory; if (SSLUtils.isInternalSSLEnabled(config) && config.getBoolean(BlobServerOptions.SSL_ENABLED)) { try { socketFactory = SSLUtils.createSSLServerSocketFactory(config); } catch (Exception e) { throw new IOException("Failed to initialize SSL for the blob server", e); } } else { socketFactory = ServerSocketFactory.getDefault(); } final int finalBacklog = backlog; this.serverSocket = NetUtils.createSocketFromPorts(ports, (port) -> socketFactory.createServerSocket(port, finalBacklog)); if (serverSocket == null) { throw new IOException("Unable to open BLOB Server in specified port range: " + serverPortRange); } // start the server thread setName("BLOB Server listener at " + getPort()); setDaemon(true); if (LOG.isInfoEnabled()) { LOG.info("Started BLOB server at {}:{} - max concurrent requests: {} - max backlog: {}", serverSocket.getInetAddress().getHostAddress(), getPort(), maxConnections, backlog); } } //...... @Override public void run() { try { while (!this.shutdownRequested.get()) { BlobServerConnection conn = new BlobServerConnection(serverSocket.accept(), this); try { synchronized (activeConnections) { while (activeConnections.size() >= maxConnections) { activeConnections.wait(2000); } activeConnections.add(conn); } conn.start(); conn = null; } finally { if (conn != null) { conn.close(); synchronized (activeConnections) { activeConnections.remove(conn); } } } } } catch (Throwable t) { if (!this.shutdownRequested.get()) { LOG.error("BLOB server stopped working. Shutting down", t); try { close(); } catch (Throwable closeThrowable) { LOG.error("Could not properly close the BlobServer.", closeThrowable); } } } } /** * Shuts down the BLOB server. */ @Override public void close() throws IOException { cleanupTimer.cancel(); if (shutdownRequested.compareAndSet(false, true)) { Exception exception = null; try { this.serverSocket.close(); } catch (IOException ioe) { exception = ioe; } // wake the thread up, in case it is waiting on some operation interrupt(); try { join(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); LOG.debug("Error while waiting for this thread to die.", ie); } synchronized (activeConnections) { if (!activeConnections.isEmpty()) { for (BlobServerConnection conn : activeConnections) { LOG.debug("Shutting down connection {}.", conn.getName()); conn.close(); } activeConnections.clear(); } } // Clean up the storage directory try { FileUtils.deleteDirectory(storageDir); } catch (IOException e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); } // Remove shutdown hook to prevent resource leaks ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); if (LOG.isInfoEnabled()) { LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort()); } ExceptionUtils.tryRethrowIOException(exception); } } //......}
  • BlobServer继承了Thread,同时实现了BlobService、BlobWriter、PermanentBlobService、TransientBlobService接口
  • 其构造器使用DefaultServerSocketFactory创建了ServerSocket,同时使用ShutdownHookUtil.addShutdownHook注册了shutdownHook,在shutdown的时候会调用close方法
  • 重写了Thread的run方法,该方法在没有接收到shutdown请求的时候,会不断循环等待serverSocket.accept(),然后创建BlobServerConnection,如果当前activeConnections超过了maxConnections则会不断循环等待2000毫秒,之后将连接维护到activeConnections,然后调用conn.start()

BlobServerConnection

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java

class BlobServerConnection extends Thread {    /** The log object used for debugging. */    private static final Logger LOG = LoggerFactory.getLogger(BlobServerConnection.class);    /** The socket to communicate with the client. */    private final Socket clientSocket;    /** The BLOB server. */    private final BlobServer blobServer;    /** Read lock to synchronize file accesses. */    private final Lock readLock;    BlobServerConnection(Socket clientSocket, BlobServer blobServer) {        super("BLOB connection for " + clientSocket.getRemoteSocketAddress());        setDaemon(true);        this.clientSocket = clientSocket;        this.blobServer = checkNotNull(blobServer);        ReadWriteLock readWriteLock = blobServer.getReadWriteLock();        this.readLock = readWriteLock.readLock();    }    // --------------------------------------------------------------------------------------------    //  Connection / Thread methods    // --------------------------------------------------------------------------------------------    /**     * Main connection work method. Accepts requests until the other side closes the connection.     */    @Override    public void run() {        try {            final InputStream inputStream = this.clientSocket.getInputStream();            final OutputStream outputStream = this.clientSocket.getOutputStream();            while (true) {                // Read the requested operation                final int operation = inputStream.read();                if (operation < 0) {                    // done, no one is asking anything from us                    return;                }                switch (operation) {                case PUT_OPERATION:                    put(inputStream, outputStream, new byte[BUFFER_SIZE]);                    break;                case GET_OPERATION:                    get(inputStream, outputStream, new byte[BUFFER_SIZE]);                    break;                default:                    throw new IOException("Unknown operation " + operation);                }            }        }        catch (SocketException e) {            // this happens when the remote site closes the connection            LOG.debug("Socket connection closed", e);        }        catch (Throwable t) {            LOG.error("Error while executing BLOB connection.", t);        }        finally {            closeSilently(clientSocket, LOG);            blobServer.unregisterConnection(this);        }    }    /**     * Closes the connection socket and lets the thread exit.     */    public void close() {        closeSilently(clientSocket, LOG);        interrupt();    }    // --------------------------------------------------------------------------------------------    //  Actions    // --------------------------------------------------------------------------------------------    private void get(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {        /*         * Retrieve the file from the (distributed?) BLOB store and store it         * locally, then send it to the service which requested it.         *         * Instead, we could send it from the distributed store directly but         * chances are high that if there is one request, there will be more         * so a local cache makes more sense.         */        final File blobFile;        final JobID jobId;        final BlobKey blobKey;        try {            // read HEADER contents: job ID, key, HA mode/permanent or transient BLOB            final int mode = inputStream.read();            if (mode < 0) {                throw new EOFException("Premature end of GET request");            }            // Receive the jobId and key            if (mode == JOB_UNRELATED_CONTENT) {                jobId = null;            } else if (mode == JOB_RELATED_CONTENT) {                byte[] jidBytes = new byte[JobID.SIZE];                readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");                jobId = JobID.fromByteArray(jidBytes);            } else {                throw new IOException("Unknown type of BLOB addressing: " + mode + '.');            }            blobKey = BlobKey.readFromInputStream(inputStream);            checkArgument(blobKey instanceof TransientBlobKey || jobId != null,                "Invalid BLOB addressing for permanent BLOBs");            if (LOG.isDebugEnabled()) {                LOG.debug("Received GET request for BLOB {}/{} from {}.", jobId,                    blobKey, clientSocket.getInetAddress());            }            // the file's (destined) location at the BlobServer            blobFile = blobServer.getStorageLocation(jobId, blobKey);            // up to here, an error can give a good message        }        catch (Throwable t) {            LOG.error("GET operation from {} failed.", clientSocket.getInetAddress(), t);            try {                writeErrorToStream(outputStream, t);            }            catch (IOException e) {                // since we are in an exception case, it means that we could not send the error                // ignore this            }            clientSocket.close();            return;        }        try {            readLock.lock();            try {                // copy the file to local store if it does not exist yet                try {                    blobServer.getFileInternal(jobId, blobKey, blobFile);                    // enforce a 2GB max for now (otherwise the protocol's length field needs to be increased)                    if (blobFile.length() > Integer.MAX_VALUE) {                        throw new IOException("BLOB size exceeds the maximum size (2 GB).");                    }                    outputStream.write(RETURN_OKAY);                } catch (Throwable t) {                    LOG.error("GET operation failed for BLOB {}/{} from {}.", jobId,                        blobKey, clientSocket.getInetAddress(), t);                    try {                        writeErrorToStream(outputStream, t);                    } catch (IOException e) {                        // since we are in an exception case, it means that we could not send the error                        // ignore this                    }                    clientSocket.close();                    return;                }                // from here on, we started sending data, so all we can do is close the connection when something happens                int blobLen = (int) blobFile.length();                writeLength(blobLen, outputStream);                try (FileInputStream fis = new FileInputStream(blobFile)) {                    int bytesRemaining = blobLen;                    while (bytesRemaining > 0) {                        int read = fis.read(buf);                        if (read < 0) {                            throw new IOException("Premature end of BLOB file stream for " +                                blobFile.getAbsolutePath());                        }                        outputStream.write(buf, 0, read);                        bytesRemaining -= read;                    }                }            } finally {                readLock.unlock();            }            // on successful transfer, delete transient files            int result = inputStream.read();            if (result < 0) {                throw new EOFException("Premature end of GET request");            } else if (blobKey instanceof TransientBlobKey && result == RETURN_OKAY) {                // ignore the result from the operation                if (!blobServer.deleteInternal(jobId, (TransientBlobKey) blobKey)) {                    LOG.warn("DELETE operation failed for BLOB {}/{} from {}.", jobId,                        blobKey, clientSocket.getInetAddress());                }            }        } catch (SocketException e) {            // happens when the other side disconnects            LOG.debug("Socket connection closed", e);        } catch (Throwable t) {            LOG.error("GET operation failed", t);            clientSocket.close();        }    }    private void put(InputStream inputStream, OutputStream outputStream, byte[] buf) throws IOException {        File incomingFile = null;        try {            // read HEADER contents: job ID, HA mode/permanent or transient BLOB            final int mode = inputStream.read();            if (mode < 0) {                throw new EOFException("Premature end of PUT request");            }            final JobID jobId;            if (mode == JOB_UNRELATED_CONTENT) {                jobId = null;            } else if (mode == JOB_RELATED_CONTENT) {                byte[] jidBytes = new byte[JobID.SIZE];                readFully(inputStream, jidBytes, 0, JobID.SIZE, "JobID");                jobId = JobID.fromByteArray(jidBytes);            } else {                throw new IOException("Unknown type of BLOB addressing.");            }            final BlobKey.BlobType blobType;            {                final int read = inputStream.read();                if (read < 0) {                    throw new EOFException("Read an incomplete BLOB type");                } else if (read == TRANSIENT_BLOB.ordinal()) {                    blobType = TRANSIENT_BLOB;                } else if (read == PERMANENT_BLOB.ordinal()) {                    blobType = PERMANENT_BLOB;                    checkArgument(jobId != null, "Invalid BLOB addressing for permanent BLOBs");                } else {                    throw new IOException("Invalid data received for the BLOB type: " + read);                }            }            if (LOG.isDebugEnabled()) {                LOG.debug("Received PUT request for BLOB of job {} with from {}.", jobId,                    clientSocket.getInetAddress());            }            incomingFile = blobServer.createTemporaryFilename();            byte[] digest = readFileFully(inputStream, incomingFile, buf);            BlobKey blobKey = blobServer.moveTempFileToStore(incomingFile, jobId, digest, blobType);            // Return computed key to client for validation            outputStream.write(RETURN_OKAY);            blobKey.writeToOutputStream(outputStream);        }        catch (SocketException e) {            // happens when the other side disconnects            LOG.debug("Socket connection closed", e);        }        catch (Throwable t) {            LOG.error("PUT operation failed", t);            try {                writeErrorToStream(outputStream, t);            }            catch (IOException e) {                // since we are in an exception case, it means not much that we could not send the error                // ignore this            }            clientSocket.close();        }        finally {            if (incomingFile != null) {                if (!incomingFile.delete() && incomingFile.exists()) {                    LOG.warn("Cannot delete BLOB server staging file " + incomingFile.getAbsolutePath());                }            }        }    }    private static byte[] readFileFully(            final InputStream inputStream, final File incomingFile, final byte[] buf)            throws IOException {        MessageDigest md = BlobUtils.createMessageDigest();        try (FileOutputStream fos = new FileOutputStream(incomingFile)) {            while (true) {                final int bytesExpected = readLength(inputStream);                if (bytesExpected == -1) {                    // done                    break;                }                if (bytesExpected > BUFFER_SIZE) {                    throw new IOException(                        "Unexpected number of incoming bytes: " + bytesExpected);                }                readFully(inputStream, buf, 0, bytesExpected, "buffer");                fos.write(buf, 0, bytesExpected);                md.update(buf, 0, bytesExpected);            }            return md.digest();        }    }    // --------------------------------------------------------------------------------------------    //  Utilities    // --------------------------------------------------------------------------------------------    private static void writeErrorToStream(OutputStream out, Throwable t) throws IOException {        byte[] bytes = InstantiationUtil.serializeObject(t);        out.write(RETURN_ERROR);        writeLength(bytes.length, out);        out.write(bytes);    }}
  • BlobServerConnection继承了Thread,它的构造器接收clientSocket及blobServer;它覆盖了Thread的run方法,该方法首先从clientSocket读取请求的operation,如果是PUT_OPERATION则调用put方法,如果是GET_OPERATION则调用get方法
  • put方法从inputStream读取jobId及blobType,之后创建incomingFile,将输入的文件先存储到临时文件,然后调用blobServer.moveTempFileToStore方法存储到blob server
  • get方法从inputStream读取jobId及blobType,之后调用blobServer.getStorageLocation获取blobFile,之后将其拷贝到local store,然后写入到outputStream

小结

  • BlobServer继承了Thread,同时实现了BlobService、BlobWriter、PermanentBlobService、TransientBlobService接口;其构造器使用DefaultServerSocketFactory创建了ServerSocket,同时使用ShutdownHookUtil.addShutdownHook注册了shutdownHook,在shutdown的时候会调用close方法
  • BlobServer重写了Thread的run方法,该方法在没有接收到shutdown请求的时候,会不断循环等待serverSocket.accept(),然后创建BlobServerConnection,如果当前activeConnections超过了maxConnections则会不断循环等待2000毫秒,之后将连接维护到activeConnections,然后调用conn.start()
  • BlobServerConnection继承了Thread,它的构造器接收clientSocket及blobServer;它覆盖了Thread的run方法,该方法首先从clientSocket读取请求的operation,如果是PUT_OPERATION则调用put方法,如果是GET_OPERATION则调用get方法;put方法从inputStream读取jobId及blobType,之后创建incomingFile,将输入的文件先存储到临时文件,然后调用blobServer.moveTempFileToStore方法存储到blob server;get方法从inputStream读取jobId及blobType,之后调用blobServer.getStorageLocation获取blobFile,之后将其拷贝到local store,然后写入到outputStream

doc

转载地址:http://hemyx.baihongyu.com/

你可能感兴趣的文章
MySQL主从同步相关-主从多久的延迟?
查看>>
【MyBatis框架】MyBatis入门程序第二部分
查看>>
一分钟了解阿里云产品:网络安全专家服务
查看>>
自定义View以及事件分发总结
查看>>
人生第一个过万 Star 的 GitHub 项目诞生
查看>>
Mac下配置多个SSH-Key (gitLab)
查看>>
Gradle之module间依赖版本同步
查看>>
一些kindle资源
查看>>
Node第一天
查看>>
【开源】多多客发布 3.0.0-alpha.6,Koa+Vue+Taro最佳实践
查看>>
页面搭建工具总结及扩展架构思考
查看>>
java springcloud版b2b2c社交电商spring cloud分布式微服务(十五)Springboot整合RabbitMQ...
查看>>
SpringCloud使用Prometheus监控(基于Eureka)
查看>>
10g手动创建数据库
查看>>
集群之RHCS
查看>>
auto_install_zabbix.sh
查看>>
Linux—文件系统
查看>>
运用Loadrunner测试Mysql数据库性能
查看>>
mysql ERROR 1396 (HY000): Operation CREATE USER failed 解决办法
查看>>
Spring MVC EL表达式不能显示
查看>>