악성코드 분석[Malicious Analysis]
Malicious Analysis

악성코드 분석[Malicious Analysis]

연구소 인턴 2023. 8. 1.
300x250
반응형

<이론>

악성코드 분석

악성코드 분석은 Virus 등으로 인해서 시스템이 피해를 보았을 때 분석하는 기법으로 정적 분석(코드 분석)과 동적 분석(행위 분석), 그리고 자동화 분석이 있다.

 

종류 설명

정적 분석 정적분석은 프로그래밍에 대한 깊은 이해와 x86 Assembly 언어의 개념이 필요하며 분석 중에 악성코드를 실   행하지 않고 디스 어셈블링(Disassembling)과 디 컴파일링(De-compiling)으로 리버스 엔지니어링(Reverse  Engineering)을 수행한 뒤 로우 레벨의 어셈블리 코드를 분석하는 식으로 진행한다. 대부분 악성코드 분석 가는 안전상 이유로 동적 분석 보다는 정적 분석을 수행하지만 일부 악성코드는 안티 디버깅 기능이 있어서    정적 분석을 어렵게 하기도 한다. 그리고 최근에는 x86(32bits)를 별로 사용하고 있지 않아서 동적 분석으로  가는 추세이다.  
동적 분석 동적분석은 악성 코드를 실행시켜서 행위(action/behavior)를 분석하는 것으로 악성 코드가 실행될 때 생기는  시스템의 변화를 본다. 시스템 감염으로 파일 삭제, 레지스트리 변조, 파일 변조, 중요 데이터와 정보 유출 등 을 유발할 수 있기 때문에 테스트 목적으로도 악성 코드를 실제 시스템에 실행해서 시스템에 감염시키는 일 은 매우 위험하다. 이런 이유로 동적 분석은 안전한 환경 안에서 수행되어져야 하기 때문에  샌드박스(Sandbox)를 사용한다. 동적 분석을 통해서 파일 시스템, 레지스트리, 프로세스, 네트워크 통신의 변 화를 모니터링 할 수 있고, 악성코드가 어떻게 작동하는지 이해할 수 있게 된다.
자동화 분석 자동화 분석은 대량의 악성코드를 분석할 때 유용하다. 자동화된 동적 악성코드 분석 도구인 쿠쿠(Cuckoo)    샌드박스를 사용하면 분석 시간을 단축할 수 있다.

 샌드박스는 시스템 손상 없이 악성코드를 분석하게 해주는 기술이다. 메인환경에서 신뢰할 수 없는 프로그램을 실행시키기 어려우므로 코드분석 용으로 제한된 환경을 제공해서 악성코드를 실행시키는 환경을 만드는데 부스터(Booster) 샌드박스 분석기, 제로 와인(Zero Wine), 멀루어(Malheur), 쿠쿠(Cuckoo) 샌드박스 등이 샌드박스로 널리 사용되고 있다.  쿠쿠 샌드박스는 2010년 허니텟 프로젝트 중 구글 썸머 코드 프로젝트에서 시작되어서 2011년 2월 5일에 공식적으로 오픈소스로 배포되기 시작했다. 클라우디오 넥스 과르네라가 처음으로 설계하고 개발했는데 Rapid7(Metasploit, Nessus 도구 등 대부분 보안관련 도구들을 소유권을 가지고 있는 회사)이 주관하는 magnificent7 프로그램에서 첫 번째 라운드를 수상했다. 

쿠쿠 샌드박스는 일반적인 Windows 실행 파일(EXE, COM, BAT), DLL 파일, PDF 문서, MS Office 문서, URL(퓌싱이나 퐈밍 사이트), PHP 스크립트 등 거의 모든 것을 분석할 수 있어서 악성 코드에 의해 생성되는 모든 프로세스와 win32 API 호출 흔적, 악성코드 실행에 의한 파일의 생성과 삭제, 다운로드, 악성코드 프로세스 메모리 덤프, PCAP 포맷에서의 네트워크 트래픽 추적, 악성코드 실행 중에 캡처한 Windows Desktop 스크린 샷, 컴퓨터 전체 메모리 덤프 등이 가능하다. 샌드박스는 악성코드 샘플을 실행하고 분석하는 관리 소프트웨어로써 격리된 가상 시스템 Win7 안에서 악성코드가 실행되는 것을 분석해준다.

쿠쿠 샌드박스 설치는

쿠쿠는 모든 Linux 운영체제에서 설치되는데 여기서는 Ubuntu 16에 설치한다. 
a. 호스트 Windows 7/10 안에 VMware 16 Professional을 설치하고 
b. 그 VMware 안에 가상머신으로 Ubuntu 16.02(18.04)를 설치하고 
c. 그 Ubuntu 안에 VirtaulBox를 설치하고 
d. 또 그 VirtualBox 안에 가상머신으로 Windows 7을 설치해서 
e. 그 Windows 7에서 악성코드 샘플을 실행시키는데 PDF 파일 분석, URL 분석, 그리고 Volatility를 이용한 메모리 포렌식, Yara, Wireshark, Radare, Bokken으로 APT 분석, 그리고 분석에 대한 Reports 등을 수행할 수 있다.
=>결론적으로 악성코드가 Win7에서 실행되고 그 결과를 Ubuntu Cuckoo Sandbox에서 분석해주는 메커니즘이다. 

Cuckoo 샌드박스 구조 

설치에 필요한 몇 가지 사전 점검사항을 알아보자.
▪ 외부에서 가상머신 내로 악성코드 파일 등이 이동되어져야 하므로 외부 호스트 Windows 7/10에 공유 폴더를 생성해서 여러 악성코드나 파일을 넣어두면 가상머신 안으로 이동이 쉽다.
▪ 모든 설치가 완료되면 쿠쿠 Ubuntu 뿐만 아니라 VirtualBox의 Win7도 스냅샷을 찍어두는데 Win7 가상머신에서 하나의 악성코드를 분석한 뒤 원래대로 되돌려야 새로운 악성코드를 실행할 수 있기 때문이다.
▪ 특히 외부 호스트 Win7/10, 그 안에 VMware 16, 또 그 안에 Ubuntu 18과 Cuckoo, 또 Ubuntu 18 안에 VirtualBox, 또 그 안의 Win7이 설치된다. 이 Win7은 악성코드가 들어가서 영향을 끼치는 호스트이므로 외부와 격리되어져야 하므로 네트워크를 Host-only로 해두고, 외부 Ubuntu Cuckoo는 외부 파일을 받거나 피싱 사이트와 같은 악성 웹 사이트에도 들어가야 하므로 네트워크를 NAT으로 해주어야 한다.
Cuckoo에서 악성코드(파일) 분석은 '명령어로 분석하는 방법'과 '대시보드에서 분석하는 방법'이 있는데 
각각 파일분석과 URL 분석이 있다.

<실습>

실습환경 : Ubuntu

악성코드 분석[Malicious Analysis] - undefined - 모든 영역
악성코드 분석[Malicious Analysis] - undefined - 모든 영역
악성코드 분석[Malicious Analysis] - undefined - 모든 영역악성코드 분석[Malicious Analysis] - undefined - 모든 영역악성코드 분석[Malicious Analysis] - undefined - 모든 영역악성코드 분석[Malicious Analysis] - undefined - 모든 영역
악성코드 분석[Malicious Analysis] - undefined - 모든 영역
악성코드 분석[Malicious Analysis] - undefined - 모든 영역
악성코드 분석[Malicious Analysis] - undefined - 모든 영역
악성코드 분석[Malicious Analysis] - undefined - 모든 영역

바탕화면에 메모장으로 agent.py를 만들어 줍니다.

소스코드는 보는바와 같습니다.

사실 이 코드는 제가 짠게 아니고 ㅎㅎ

/home/cuckoo/.cuckoo/agent/agent.py파일을 가져온 것 입니다.

#!/usr/bin/env python
# Copyright (C) 2015-2019 Cuckoo Foundation.
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# See the file 'docs/LICENSE' for copying permission.

import argparse
import cgi
import io
import json
import os
import platform
import re
import shutil
import stat
import subprocess
import sys
import tempfile
import traceback
import zipfile

import SimpleHTTPServer
import SocketServer

AGENT_VERSION = "0.10"
AGENT_FEATURES = [
    "execpy", "pinning", "logs", "largefile", "unicodepath",
]

sys.stdout = io.BytesIO()
sys.stderr = io.BytesIO()

class MiniHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
    server_version = "Cuckoo Agent"

    def do_GET(self):
        request.client_ip, request.client_port = self.client_address
        request.form = {}
        request.files = {}
        request.method = "GET"

        self.httpd.handle(self)

    def do_POST(self):
        environ = {
            "REQUEST_METHOD": "POST",
            "CONTENT_TYPE": self.headers.get("Content-Type"),
        }

        form = cgi.FieldStorage(fp=self.rfile,
                                headers=self.headers,
                                environ=environ)

        request.client_ip, request.client_port = self.client_address
        request.form = {}
        request.files = {}
        request.method = "POST"

        # Another pretty fancy workaround. Since we provide backwards
        # compatibility with the Old Agent we will get an xmlrpc request
        # from the analyzer when the analysis has finished. Now xmlrpc being
        # xmlrpc we're getting text/xml as content-type which cgi does not
        # handle. This check detects when there is no available data rather
        # than getting a hard exception trying to do so.
        if form.list:
            for key in form.keys():
                value = form[key]
                if value.filename:
                    request.files[key] = value.file
                else:
                    request.form[key] = value.value.decode("utf8")

        self.httpd.handle(self)

class MiniHTTPServer(object):
    def __init__(self):
        self.handler = MiniHTTPRequestHandler

        # Reference back to the server.
        self.handler.httpd = self

        self.routes = {
            "GET": [],
            "POST": [],
        }

    def run(self, host="0.0.0.0", port=8000):
        self.s = SocketServer.TCPServer((host, port), self.handler)
        self.s.allow_reuse_address = True
        self.s.serve_forever()

    def route(self, path, methods=["GET"]):
        def register(fn):
            for method in methods:
                self.routes[method].append((re.compile(path + "$"), fn))
            return fn
        return register

    def handle(self, obj):
        if "client_ip" in state and request.client_ip != state["client_ip"]:
            if request.client_ip != "127.0.0.1":
                return
            if obj.path != "/status" or request.method != "POST":
                return

        for route, fn in self.routes[obj.command]:
            if route.match(obj.path):
                ret = fn()
                break
        else:
            ret = json_error(404, message="Route not found")

        ret.init()
        obj.send_response(ret.status_code)
        ret.headers(obj)
        obj.end_headers()

        if isinstance(ret, jsonify):
            obj.wfile.write(ret.json())
        elif isinstance(ret, send_file):
            ret.write(obj.wfile)

    def shutdown(self):
        # BaseServer also features a .shutdown() method, but you can't use
        # that from the same thread as that will deadlock the whole thing.
        self.s._BaseServer__shutdown_request = True

class jsonify(object):
    """Wrapper that represents Flask.jsonify functionality."""
    def __init__(self, **kwargs):
        self.status_code = 200
        self.values = kwargs

    def init(self):
        pass

    def json(self):
        return json.dumps(self.values)

    def headers(self, obj):
        pass

class send_file(object):
    """Wrapper that represents Flask.send_file functionality."""
    def __init__(self, path):
        self.path = path
        self.status_code = 200

    def init(self):
        if not os.path.isfile(self.path):
            self.status_code = 404
            self.length = 0
        else:
            self.length = os.path.getsize(self.path)

    def write(self, sock):
        if not self.length:
            return

        with open(self.path, "rb") as f:
            while True:
                buf = f.read(1024 * 1024)
                if not buf:
                    break

                sock.write(buf)

    def headers(self, obj):
        obj.send_header("Content-Length", self.length)

class request(object):
    form = {}
    files = {}
    client_ip = None
    client_port = None
    method = None
    environ = {
        "werkzeug.server.shutdown": lambda: app.shutdown(),
    }

app = MiniHTTPServer()
state = {}

def json_error(error_code, message):
    r = jsonify(message=message, error_code=error_code)
    r.status_code = error_code
    return r

def json_exception(message):
    r = jsonify(message=message, error_code=500,
                traceback=traceback.format_exc())
    r.status_code = 500
    return r

def json_success(message, **kwargs):
    return jsonify(message=message, **kwargs)

@app.route("/")
def get_index():
    return json_success(
        "Cuckoo Agent!", version=AGENT_VERSION, features=AGENT_FEATURES
    )

@app.route("/status")
def get_status():
    return json_success("Analysis status",
                        status=state.get("status"),
                        description=state.get("description"))

@app.route("/status", methods=["POST"])
def put_status():
    if "status" not in request.form:
        return json_error(400, "No status has been provided")

    state["status"] = request.form["status"]
    state["description"] = request.form.get("description")
    return json_success("Analysis status updated")

@app.route("/logs")
def get_logs():
    return json_success(
        "Agent logs",
        stdout=sys.stdout.getvalue(),
        stderr=sys.stderr.getvalue()
    )

@app.route("/system")
def get_system():
    return json_success("System", system=platform.system())

@app.route("/environ")
def get_environ():
    return json_success("Environment variables", environ=dict(os.environ))

@app.route("/path")
def get_path():
    return json_success("Agent path", filepath=os.path.abspath(__file__))

@app.route("/mkdir", methods=["POST"])
def do_mkdir():
    if "dirpath" not in request.form:
        return json_error(400, "No dirpath has been provided")

    mode = int(request.form.get("mode", 0777))

    try:
        os.makedirs(request.form["dirpath"], mode=mode)
    except:
        return json_exception("Error creating directory")

    return json_success("Successfully created directory")

@app.route("/mktemp", methods=["GET", "POST"])
def do_mktemp():
    suffix = request.form.get("suffix", "")
    prefix = request.form.get("prefix", "tmp")
    dirpath = request.form.get("dirpath")

    try:
        fd, filepath = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dirpath)
    except:
        return json_exception("Error creating temporary file")

    os.close(fd)

    return json_success("Successfully created temporary file",
                        filepath=filepath)

@app.route("/mkdtemp", methods=["GET", "POST"])
def do_mkdtemp():
    suffix = request.form.get("suffix", "")
    prefix = request.form.get("prefix", "tmp")
    dirpath = request.form.get("dirpath")

    try:
        dirpath = tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=dirpath)
    except:
        return json_exception("Error creating temporary directory")

    return json_success("Successfully created temporary directory",
                        dirpath=dirpath)

@app.route("/store", methods=["POST"])
def do_store():
    if "filepath" not in request.form:
        return json_error(400, "No filepath has been provided")

    if "file" not in request.files:
        return json_error(400, "No file has been provided")

    try:
        with open(request.form["filepath"], "wb") as f:
            shutil.copyfileobj(request.files["file"], f, 10*1024*1024)
    except:
        return json_exception("Error storing file")

    return json_success("Successfully stored file")

@app.route("/retrieve", methods=["POST"])
def do_retrieve():
    if "filepath" not in request.form:
        return json_error(400, "No filepath has been provided")

    return send_file(request.form["filepath"])

@app.route("/extract", methods=["POST"])
def do_extract():
    if "dirpath" not in request.form:
        return json_error(400, "No dirpath has been provided")

    if "zipfile" not in request.files:
        return json_error(400, "No zip file has been provided")

    try:
        with zipfile.ZipFile(request.files["zipfile"], "r") as archive:
            archive.extractall(request.form["dirpath"])
    except:
        return json_exception("Error extracting zip file")

    return json_success("Successfully extracted zip file")

@app.route("/remove", methods=["POST"])
def do_remove():
    if "path" not in request.form:
        return json_error(400, "No path has been provided")

    try:
        if os.path.isdir(request.form["path"]):
            # Mark all files as readable so they can be deleted.
            for dirpath, _, filenames in os.walk(request.form["path"]):
                for filename in filenames:
                    os.chmod(os.path.join(dirpath, filename), stat.S_IWRITE)

            shutil.rmtree(request.form["path"])
            message = "Successfully deleted directory"
        elif os.path.isfile(request.form["path"]):
            os.chmod(request.form["path"], stat.S_IWRITE)
            os.remove(request.form["path"])
            message = "Successfully deleted file"
        else:
            return json_error(404, "Path provided does not exist")
    except:
        return json_exception("Error removing file or directory")

    return json_success(message)

@app.route("/execute", methods=["POST"])
def do_execute():
    if "command" not in request.form:
        return json_error(400, "No command has been provided")

    # Execute the command asynchronously? As a shell command?
    async = "async" in request.form
    shell = "shell" in request.form

    cwd = request.form.get("cwd")
    stdout = stderr = None

    try:
        if async:
            subprocess.Popen(request.form["command"], shell=shell, cwd=cwd)
        else:
            p = subprocess.Popen(
                request.form["command"], shell=shell, cwd=cwd,
                stdout=subprocess.PIPE, stderr=subprocess.PIPE
            )
            stdout, stderr = p.communicate()
    except:
        return json_exception("Error executing command")

    return json_success("Successfully executed command",
                        stdout=stdout, stderr=stderr)

@app.route("/execpy", methods=["POST"])
def do_execpy():
    if "filepath" not in request.form:
        return json_error(400, "No Python file has been provided")

    # Execute the command asynchronously? As a shell command?
    async = "async" in request.form

    cwd = request.form.get("cwd")
    stdout = stderr = None

    args = [
        sys.executable,
        request.form["filepath"],
    ]

    try:
        if async:
            subprocess.Popen(args, cwd=cwd)
        else:
            p = subprocess.Popen(args, cwd=cwd,
                                 stdout=subprocess.PIPE,
                                 stderr=subprocess.PIPE)
            stdout, stderr = p.communicate()
    except:
        return json_exception("Error executing command")

    return json_success("Successfully executed command",
                        stdout=stdout, stderr=stderr)

@app.route("/pinning")
def do_pinning():
    if "client_ip" in state:
        return json_error(500, "Agent has already been pinned to an IP!")

    state["client_ip"] = request.client_ip
    return json_success("Successfully pinned Agent",
                        client_ip=request.client_ip)

@app.route("/kill")
def do_kill():
    shutdown = request.environ.get("werkzeug.server.shutdown")
    if shutdown is None:
        return json_error(500, "Not running with the Werkzeug server")

    shutdown()
    return json_success("Quit the Cuckoo Agent")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("host", nargs="?", default="0.0.0.0")
    parser.add_argument("port", nargs="?", default="8000")
    args = parser.parse_args()

    app.run(host=args.host, port=int(args.port))

 

악성코드 분석[Malicious Analysis] - undefined - 모든 영역

그다음 네트워크를 vlan에 맞추어 줍니다.

악성코드 분석[Malicious Analysis] - undefined - 모든 영역

cuckoo의 웹서버를 실행해주고

악성코드 분석[Malicious Analysis] - undefined - 모든 영역

cuckoo를 실행해 줍니다.

악성코드 분석[Malicious Analysis] - undefined - 모든 영역

firefox http://127.0.0.1:8000을 하면 웹으로 접속이 가능한데, 저는 명령어로 분석을 진행하겠습니다.

악성코드 분석[Malicious Analysis] - undefined - 모든 영역

cuckoo submit --platform windows --package doc /home/cuckoo/Desktop/malware_doc/2020-03-20-IcedID-IOCs.doc[분석할 파일]을 통해 분석을 진행하겠습니다.

악성코드 분석[Malicious Analysis] - undefined - 모든 영역

실행결과로 virtualbox가 열리고 화면이 나온는데 agent를 실행해주면 분석을 시작합니다.

악성코드 분석[Malicious Analysis] - undefined - 모든 영역

분석결과가 파일로 만들어집니다.

 

악성코드 분석[Malicious Analysis] - undefined - 모든 영역

결과창을 웹으로 실행시켜줍니다.

악성코드 분석[Malicious Analysis] - undefined - 모든 영역
악성코드 분석[Malicious Analysis] - undefined - 모든 영역

이렇게 분석된 화면이 나오고 screen shot도 나오게 됩니다.

이 파일은 이상이 없는 파일이네요.

300x250
반응형

댓글