도수리

전 직장 동료, 친구 등과 함께 제대로 각 잡고 사이드 프로젝트를 하고 있다.

 

이 참에 광고를 좀 하자면, 도수리라는 서비스를 개발, 운영 중이다.

도수리도수치료를 제공하는 전국 병원의  가격, 의료진, 치료 후기를 볼 수 있는 플랫폼으로, 커뮤니티 또한 제공하여 실 사용자들이 사진이 포함된 솔직한 후기를 등록하고 공유할 수 있는 서비스이다.

 

부하 테스트

사이드 프로젝트 답게, 서버 비용의 압박을 몸으로 직접 느끼고 있다. 자연스럽게 ec2 인스턴스도 최대한 작은걸 쓰고 있는데, 대신 오토스케일링을 걸어서 트래픽이 몰리면 인스턴스를 늘리도록 세팅해두었다.

 

그렇다고 해도 인스턴스를 추가로 띄우는 작업은 비교적 무거운 작업이기 때문에 시간이 어느정도 소요될 수 밖에 없고, 그 전에는 기존 인스턴스가 몸빵을 해줘야 한다. 

 

이러한 이유로, 기본 인스턴스가 얼만큼의 트래픽을 버텨줄 수 있는지 locust 를 통해 테스트 해보려고 한다.

사용법은 워낙 간단하다. 

 

pip 명령어로 설치를 해주고,

pip install locust

 

스크립트는 아래와 같이 간단하게 작성할 수 있는데, 자세한 사용법은 locust 의 man page를 참고하자.

from locust import HttpUser, task


class TrafficTest(HttpUser):
    @task
    def test_landing_api(self):
        self.client.get("/test/api")

locust 명령어를 실행하면 8089 포트로 아래와 같이 테스트를 실행할 수 있는 대시보드가 뜬다. 

Number of users는 최대 유저 수

Spawn rate는 한번에 유저가 생성되는 수

Host 부하 테스트할 서버 주소

100의 Number of users, 1의 Spawn rate 테스트를 해본 결과 아래와 같이 차트로 추세를 확인할 수 있었다.

 

 1의 Spawn rate로 인해 동시에 요청하는 유저의 수가 1, 2, 3, ..., 100 으로 점차 증가했고, 증가함에 따라 리스폰스 타임이 지속적으로 늘어나 동접 유저가 17명이 됐을 때 부터는  api 응답이 평균적으로 1초가 넘게 걸리는 것이 보인다. 이 시점부터 유저들이 불편함을 느끼지 않을까 싶다.

 

해당 시점의 ec2 모니터링 지표를 보면, 사실 CPU가 70% 넘게 피크를 치고 있는건 아닌데 Network I/O 에서 부하가 걸리는것 같기도 하다.

이 지표를 가지고 오토 스케일링의 민감도를 조절해서 유저의 불편함을 줄이도록 인프라 구성을 잘 해봐야 겠다 하는 생각은 드는데, 동시에 기본으로 떠 있는 인스턴스가 동접자 몇 명 까지 몸빵을 해줄 수 있어야 할까 하는 부분에 대해서는 잘 모르겠다.

 

지금 당장 트래픽이 많이 몰리지는 않으니 큰 무리는 없으나, 서비스가 성장함에 따라 지속적으로 고민해야할 부분인듯 하다.

임시 방편으로 트래픽이 몰리면 알람이 오도록 Cloud Watch 구성은 해두었으니, 추이를 지켜보자.

 

서비스 운영하는게 이렇게 어려울 줄이야. 근데 그 만큼 재밌기도 하다.

 

 Referecnes

https://www.dosuri.site/

 

도수리

도수 통증치료 병원정보는 도수리

www.dosuri.site

https://locust.io/

 

Locust.io

An open source load testing tool. Define user behaviour with Python code, and swarm your system with millions of simultaneous users.

locust.io

https://docs.locust.io/en/stable/

 

Locust Documentation — Locust 2.15.1 documentation

© Copyright . Revision 350d3041.

docs.locust.io

https://wookkl.tistory.com/67

 

부하테스트 - Locust (설치 및 스크립트 작성)

들어가며 파이썬으로 작성된 Locust라는 툴을 설치해서 사용하는 방법을 알아보고 웹 애플리케이션에 부하 테스트를 적용해본다. 참고 사항: locust는 영어로 메뚜기라는 뜻인데, 이 툴에서 부하를

wookkl.tistory.com

 

RabbitMQ

RabbitMQ는 Message Queue의 일종으로, Message Queue에 대한 내용은 celery + Mesage Queue 사용하기 포스트에서 간단히 다뤘으므로, 이번 포스트에서는 Message Queue 자체에 대한 특성은 다루지 않으려 한다.

 

파이썬으로 애플리케이션을 개발하다 보면 비동기 처리를 해야할 때가 있는데, 그럴 때 나는 메시지큐, 그 중에서도 RabbitMQ를 주로 사용하는 편이다.

 

RabbitMQ 는 AMQP 프로토콜을 베이스로 지원하며(플러그인을 설치하면 MQTT 등도 지원한다), python 에서는 AMQP로 메시지를 송수신 할 수 있게 해주는 pika 라이브러리를 사용할 수 있다.

 

pika를 사용하기에 앞서 RabbitMQ와 AMQP에 대해 간략하게 알아보고 넘어가자.

RabbitMQ는 조금 특이하게, Message 를 Publish 하게 되면 바로 Queue에 들어가지 않고 Exchange라는 곳을 통하게 된다. 그리고 Exchange Type  Binding 규칙에 따라 적절한 Queue 로 전달된다. 이후, Queue와 Connection을 맺고 Consume을 하는 형태로 작업이 진행된다.

 

이 과정을 수행할 때 알아야 할 용어들은 다음과 같다.

 

Exchange Type

1) Direct exchange
1:1 관계로, Exchange에 바인딩 된 Queue 중에서 메시지의 라우팅 키와 매핑되어 있는 Queue로 메시지를 전달한다
 
2) Fanout exchange
1:N 관계로, 메시지의 라우팅 키를 무시하고 Exchange에 바인딩 된 모든 Queue에 메시지를 전달한다
 
3) Topic exchange

마찬가지로 1:N 관계이지만, Exchange에 바인딩 된 Queue 중에서 메시지의 라우팅 키가 패턴에 맞는 Queue에게만 모두 메시지를 전달한다.


4) Headers exchange
라우팅 키 대신 메시지 헤더에 여러 속성들을 더해 속성들이 매칭되는 큐에 메시지를 전달한다.

 

Binding

생성된 Exchange 에는 전달 받은 메시지를 원하는 Queue 로 전달하기 위해 정의하는 규칙이다.

direct exchange의 경우 Queue 이름만으로 Binding 할 수도 있고, topic exchange의 경우 routing key 를 지정해서 메시지를 필터링 한 후 지정한 Queue 에만 보내도록 Binding 할 수 있다.

 

Connection

 

  • RabbitMQ에서 지원하는 모든 프로토콜은 TCP 기반이다.
  • 효율성을 위해 긴 연결을 가정한다. (프로토콜 작업당 새 연결이 열리지 않음.)
  • 하나의 클라이언트 연결은 단일 TCP 연결을 사용한다.
  • 연결이 더 이상 필요하지 않은 경우, 리소스 절약을 위해 연결을 닫아야 한다. 이를 수행하지 못하는 클라이언트는 리소스의 대상 노드를 고갈시킬 위험이 있다.

 

Channel

RabbitMQ는 Connection 외에도, Channel 이라는 특이한 개념을 가지고 있다. 

  • 단일 TCP 연결을 공유하는 논리적인 개념의 경량 연결로 다중화된다.
  • 클라이언트가 수행하는 모든 프로토콜 작업은 채널에서 발생한다.
  • 채널 안에 연결할 Queue를 선언할 수 있으며, 채널 하나당 하나의 Queue만 선언이 가능하다.
  • 특정 채널의 통신은 다른 채널의 통신과 완전히 분리되어 있기 때문에 프로토콜은 채널 ID와 같은 식별자를 포함시켜 전달한다.
  • 채널 ID를 통해 클라이언트나 브로커 모두 채널에 대한 파악이 가능하다.
  • 채널은 Connection Context에만 존재하기 때문에 Connection이 닫히면, 연결된 모든 채널도 닫힌다.
  • 클라이언트에서 처리를 위해 멀티 프로세스/스레드를 사용한다면, 프로세스/스레드 별로 새 채널을 열고 공유하지 않는 것이 일반적이다.

Connection 과 Channel 의 관계를 정리하면,

  • Connection은 물리적인 연결이다.
  • Connection은 단일 TCP 연결만 가능하다.
  • Channel은 Connection Context를 공유하며, 하나 이상의 경량 연결이 가능하다.
  • Channel 하나당 하나의 Queue만 연결이 가능하다.

 

Pika

pika의 공식문서를 보면 위에서 적어 놓은 AMQP의 개념들을 class와 method 로 감싸서 추상화 해놓은 것을 볼 수 있다.

 

Subscribe

 BlockingConnection 을 맺어 queue 와 연결을 맺고, message를 받아 처리하는 예제 코드이다. 

import pika

class AmqpClient:
    def __init__(self, broker_url, queue):
        self.queue = queue
        parsed = self.parse_url(broker_url_or_connection)
        self.address = parsed['address']
        self.username = parsed['username']
        self.password = parsed['password']
        self.port = parsed['port']
        
        self.connection = self.connect()
        self.channel = self.assign_channel()

	# amqp://guest:guest@localhost:5672 등의 url을 받아 파싱
    def parse_url(self, url):
        scheme, rest = url.split('://')
        assert scheme in ['amqp', 'amqps']

        auth, rest = rest.split('@')
        username, password = auth.split(':')

        if ':' in rest:
            address, port = rest.split(':')
        else:
            address = rest
            port = '5432'
        port = int(port)
        return {
            'scheme': scheme, 'username': username, 'password': password,
            'address': address, 'port': port
        }

	# BlockingConnection 연결, channel 할당은 추가적으로 필요
    def connect(self):
        credentials = pika.PlainCredentials(self.username, self.password)
        parameters = pika.ConnectionParameters(host=self.address, port=self.port, credentials=credentials)
        connection = pika.BlockingConnection(parameters)

        return connection

	# channel 할당 및 queue 선언
    def assign_channel(self, connection):
        channel = connection.channel()
        channel.queue_declare(queue=self.queue, durable=True, exclusive=False, auto_delete=False)
        channel.confirm_delivery()

        return channel

	# connection이 묶였는지 확인, 끊겼다면 다시 연결
    def ensure_connection(self, exchange, routing_key):
        try:
            self.channel.queue_bind(self.queue, exchange, routing_key)
        except StreamLostError:
            connection = self.connect()
            self.channel = self.assign_channel(connection)

	# packet을 어떻게 처리할지에 대한 callback 함수
    def on_message(self):
        pass

    def subscribe(self, topic, exchange):
        self.channel.queue_bind(self.queue, exchange, routing_key=topic)
        self.channel.basic_consume(
            queue=self.queue, on_message_callback=self.on_message, auto_ack=True)

        self.start_consuming(self.channel.connection)

	# connection이 끊겼다면 다시 consume을 다시 시작하도록
    def start_consuming(self, connection):
        try:
            while True:
                connection.process_data_events(None)
        except pika.exceptions.ConnectionClosed:
            # Handle connection closed
            # Re-establish the connection and restart the consuming process
            self.start_consuming(connection)

pika를 사용하면서 가장 헷갈렸던 부분이 hearbeat timeout이다. 

pika는 기본적으로 60초의 heartbeat timeout을 가지고 있는데, 이 시간 동안 packet을 보내지 않으면 connection을 더 이상 유지할 필요가 없다고 판단하여 끊어버린다. 이를 해결하기 위해 heartbeat time을 늘리거나 없애는 것도 방법이지만 권장되지 않는다. packet을 보내는 주기가 길다면 connectino을 끊고 packet을 보낼 때 다시 연결하는 것이 자연스러운 듯 하다.

 

위 예제에서는 고려되지 않았지만, packet을 받아서 처리하는 callback method의 처리 속도가 heartbeat timeout을 넘어가는 경우에도, connection을 끊어버리는 문제가 발생할 수 있다. 이 때는 상황이 조금 달라, callback method를 처리하는 동안 sub thread를 할당하여 RabbitMQ 서버에 수동으로 ack를 해주거나, heartbeat timeout 시간을 늘리는 방법을 생각해봐야 한다.

 

Publish

Subscribe 부분에서 정의한 AmqpClient 클래스를 활용하여 topic exchange 타겟으로 메시지를 publish 하는 예제코드이다.

import pika

def main():
    pub_client = AmqpClient(pub_broker_url, queue)
    pub_client.ensure_connection('amq.topic', queue)
    pub_client.channel.basic_publish(
        exchange='amq.topic',
        routing_key=routing_key,
        body=payload,
        properties=props,
    )
    ...

 

References

https://pika.readthedocs.io/en/stable/index.html

 

Introduction to Pika — pika 1.2.1 documentation

© Copyright 2009-2017, Tony Garnock-Jones, Gavin M. Roy, Pivotal Software, Inc and contributors. Revision 741cfc4c.

pika.readthedocs.io

https://jonnung.dev/rabbitmq/2019/02/06/about-amqp-implementtation-of-rabbitmq/#gsc.tab=0

 

조은우 개발 블로그

조은우 개발자 블로그

jonnung.dev

https://stackoverflow.com/questions/57650669/how-to-change-timeout-using-rabbitmq-pika-basic-consume-in-python

이직을 위해 첫 서류를 쓴지 2달이 조금 안되는 시간 후에 운좋게 한 회사로 이직을 하고 첫 주를 보내게 되었다.

 

사실은 B2C 서비스를 하는 회사에 가고 싶긴 했는데, 소위 말하는 '네임드' 회사들 면접을 보면서 아직 준비가 되지 않았다는 것을 온몸으로 느꼈다 ㅋㅋ..

 

이런 나를 받아주는 회사가 하나라도 있어서 새삼 느끼면서 감사한 마음으로 이직을 하게 되었는데, 사실 면접 경험이 너무 좋아서 꽤나 만족스럽다.

 

첫 주를 보내며 기존에 계시던 서버 개발자분(팀리더)의 코드를 살살 보고 있는데 응애 주니어인 나에게는 너무 수준이 높다.

 

아무래도 당분간은 이직 준비를 할 때보다 빡시게 공부해야하지 않을까 싶다.

 

그 시작으로 파이썬 코드를 이쁘게(?) 적는 스타일을 정리해보려고 한다.

 

1. Enumerate 

list의 데이터와 인덱스를 둘 다 track 해야 할 때는 range 보다 enumerate를 사용하는 것이 성능이 좋고 깔끔하다.

data = [1, 2, -4, -3]
for i in range(len(data)):
	if data[i] < 0:
    	data[i] = 0
        

data = [1, 2, -4, -3]
for idx, val in enumerate(data):
	if num < 0:
    	data[idx] = 0

 

2. List Comprehension

squares = []
for i in range(10):
	squares.append(i * i)
    
squares = [i * i for i in range(10)]

 

3. Sorted

데이터 셋을 정렬하려고 할 때 sort, sorted 중에 선택을 할 수 있다.

둘의 차이점은 sort는 기존의 데이터 셋에서 정렬이 바로 진행되고, sorted는 정렬한 데이터 셋을 반환하는 차이가 있다.

sorted 함수가 더 pythonic 하다고 한다.

'''
sort
'''

data = [1, 3, 5, 7]
data.sort()


'''
sorted
'''

data = [1, 3, 5, 7]
sorted_data = sorted(data)


'''
sorted function extra option
'''

# 내림차순
data = (3, 5, 1, 10, 9)
sorted_data = sorted(data, reverse=True)

# Custom 정렬
data = [{"name": "Max", "age": 6},
		{"name": "Lisa", "age": 20},
        {"name": "Ben", "age": 9}]

sorted_data = sorted(data, key=lambda x: x["age"])

 

4. Set

데이터 셋의 중복을 제거하는 데에는 Set이 최고다.

data = [1, 2, 2, 3, 3, 3, 4, 5, 6, 6]
unique_data = list(set(data))

 

5. Generator

generator comprehension

(syntax: (<expression> for <var> in <iterable> [if <condition>])) 으로 generator 를 생성할 수 있다.

 

genereator는 코드 실행을 lazy하게 실행한다: lazy 하게 실행한다는 것은 선언할 때 바로 코드를 실행하는 것이 아니라, 실제로 사용할 때 실행한다는 것을 의미한다.

이는 generator가 모든 값을 메모리에 담고 있지 않고 그때그때 값을 생성하는 성질을 갖고 있기 때문이다.

 

결과적으로 sum 함수를 사용할 때 list보다 훨씬 적은 메모리를 사용한다.

imort sys

data = [i for i in range(10000)]
sum(data)
print(sys.getsizeof(data), "bytes")

gen = (i for i in range(10000))
sum(gen)
print(sys.getsizeof(gen), "bytes")

'''
87632 bytes
128 bytes
'''

 

6. dict .get(), setdefault()

dict 의 key를 조회할 때 get() 을 사용하여 조회하면, 해당 key가 없을 때 오류를 발생시키는 대신 get 함수로 넘기는 디폴트 값을 가져올 수 있다.

my_dict = {"item": "ball", "price": 10}
count = my_dict.get("count", 0)
print(count)


'''
0
'''

해당 key가 dict에 있는지 없는지 애매한 경우 조건문을 사용할 필요 없이 setdefault()함수를 사용할 수 있다. 기존에 이미 존재하는 key이면 value가 변경되지 않는다.

my_dict = {'a': 1, 'b': 2}
print(my_dict)
my_dict.setdefault('b', 3)
print(my_dict)
my_dict.setdefault('c', 4)
print(my_dict)

'''
{'a': 1, 'b': 2}
{'a': 1, 'b': 2}
{'a': 1, 'b': 2, 'c': 4}
'''

 

7.  collections.Counter

list 안에 있는 특정 데이터의 개수를 알고 싶으면 반복문을 사용하는 것보다 더 좋은 방법이 있다.

from collections import Counter

data = [10, 10, 10, 5, 5, 2, 9, 9, 9, 9, 9, 9]
counter = Counter(data)
print(counter)

most_common = counter.most_common(3)  # 가장 많은 데이터 3개까지 조회
print(most_common)

print(counter[9])
print(counter[0])  # 존재하지 않는 데이터를 검색하면 0개 리턴

'''
Counter({9: 6, 10: 3, 5: 2, 2: 1})
[(9, 6), (10, 3), (5, 2)]
6
0
'''

 

8. f-Strings

string 에 변수를 넣어주는 문법이다. 굉장히 간편하다.

name = "Max"
sample_string = f"Hello {name}"

 

9. string .join()

list를 string으로 합치고 싶을 때 사용한다. list를 반복문으로 돌면서 string에 더해주는 방식보다 성능이 좋다.

list_of_string = ["Hello", "my", "friend"]

# BAD
my_string = ""
for i in list_of_strings:
	my_string += i + " "
    
# GOOD
my_string = " ".join(list)of_strings)

'''
Hello my friend
'''

 

10. merge dict

여러개의 dict를 하나로 합칠 수 있다.

d1 = {"name": "Alex", "age": 25}
d2 = {"name": "Alex", "city": "New York"}

merged_dict = {**d1, **d2}
print(merged_dict)

'''
{'name': 'Alex', 'age': 25, 'city': 'New York'}
'''

 

References

https://www.youtube.com/watch?v=8OKTAedgFYg&list=PL8LLsJ8LoL9aHISvhk4Az4Tdeuqpquboi&index=17

약 1달 전부터 이직을 하기위해 FA로 나왔고, 다행히 몇 몇 회사들의 면접을 볼 수 있었다.

대부분은 면접에서 탈락했지만... 돌이켜 보면 준비가 미흡했던 것도 있고 경력직 면접인 만큼 고민하지 않고 개발했다면 뚫어내기가 쉽지 않은 경우가 많았던것 같다.

 

위기를 기회로 삼아, 면접때 받은 질문들을 바탕으로 기본기를 다져 보려고 한다.

 

오늘 알아볼 것은 with 문을 사용하는 것이다.

 

https://princeji-h.tistory.com/35

 

django - 동시성 문제 해결하기 (lock)

이번에 회사에서 사용하는 화상 회의툴을 줌으로 교체하는 업무를 맡아서 하던 중에, 튜터와 튜티가 동시에 수업 입장을 시도하는 경우에 각각 다른 방으로 인도하는 현상을 발견했다. 수업에

princeji-h.tistory.com

 

위 포스터를 보면 동시성 문제를 해결하기 위해 lock을 인위적으로 만들어 사용하게 되는데,

 

이 때 아래와 같이 lock을 취득하고 회수하는 함수를 로직의 시작과 끝에 호출하게 된다.

try:
    VideoConferenceLock.get_lock('session_id', session_id)
    with transaction.atomic():
        # 회의방  생성
        ...
	
    VideoConferenceLock.release_lock('session_id', session_id)
    return Response
except:
	VideoConferenceLock.release_lock('session_id', session_id)

만약에 위와 같은 로직을 다른 race condition이 발생하는 곳에서도 재사용 하려고 하면 어떻게 해야할까?

 

실제로 면접에서 들었던 질문인데, 제대로 답을 하지 못했다.

 

면접을 복기하면서 질문들을 정리하다가, 아 이거 with문을 만들어주면 될거같은데?? 하는 생각이 들었다.

 

찾아보니 with 문은 자원을 획득하고, 사용하고, 반납할때 유용하게 사용할 수 있는 문법  이라고 한다.

 

with문은 다음과 같이 작성하면 된다.

class Example:
    # 사용할 자원 생성 및 획득.
    def __enter__(self):
        print("start.")
        return self

    # 자원 사용.
    def print_number(self, num):
        print(f"Number: {num}")

    # 자원 반납 및 후처리.
    def __exit__(self, exc_type, exc_val, exc_tb):
        print("end.")

with Example() as ex:
    ex.print_number(42)
    ex.print_number(10)
    
    
# std out
start.
Number: 42
Number: 10
end.

 

위 내용을 만든 로직에 적용시켜 보자

class VideoConferenceLock:
    def __init__(self, key):
        self.key = key

    def __enter__(self):
        self.get_lock(self.key)

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release_lock(self.key)

    def get_lock(self, session_id):
        key = f'zoom_session_id:{session_id}'
        while True:
            # 해당 key가 레디스에 없으면 lock  (1초마다 시도)
            if cache.get(key) is None:
                break
            time.sleep(1)
        cache.set(key, 1, 30)  # 30초 간 유효

    def release_lock(self, session_id):
        key = f'zoom_session_id:{session_id}'
        cache.delete(key)

VideoConferenceLock Class를 재정의한 이후, 아래와 같이 with 구문을 넣어서 get_lock 과 release_lock 을 자동으로 적용해줄 수 있다.

with VideoConferenceLock():
    try:
        with transaction.atomic():
            # 회의방  생성
            ...

        return Response
    except:
    	...

 

Reference

https://velog.io/@hyungraelee/Python-with

python 은 리스트 정렬을 위한 sort 메소드와 iterable한 객체 정렬을 위한 sorted 메소드를 제공한다.

 

두가지 메소드는 정렬 기준을 직접 세울 수 있게 해준다.

 

sorted(filter_schedules, key=(operator.attrgetter('status', 'start_date'))

위의 예문에서, filter_schedules는 쿼리 결과 오브젝트를 담고있는 list이다.

 

장고 orm 이 제공하는 order_by를 사용하면 정렬은 쉽게 해결되지만, 쿼리 이후에 뭔가 데이터 조작이 필요한 경우에는 쿼리를 가져올 때 order_by를 사용할수가 없다.

 

위와 같이 사용하면 오브젝트가 가진 status 칼럼을 기준으로 정렬한 이후에 start_date칼럼을 기준으로 정렬한 결과를 얻어낼 수 있다.

 

operator는 파이썬에서 제공하는 내장 함수이며 https://docs.python.org/ko/3.7/library/operator.html 에 자세히 설명되어 있다.

 

 

result['schedules'].sort(key=(operator.itemgetter('name')))
result['schedules'].sort(key=(operator.itemgetter('start_date')), reverse=True)
result['schedules'].sort(key=(operator.itemgetter('status')))

추가적으로,

 

sorted가 아닌 sort메소드를 사용할 때 위와 같이 한 번에 정렬하지 않더라도 이전의 정렬 상태에서 정렬이 이어서 진행된다.

 

2021.11.27 내용 추가

itemgetter, attrgetter

students = [
    ("jane", 22, 'A'),
    ("dave", 32, 'B'),
    ("sally", 17, 'B'),
]

위와 같은 형태에서 itemgetter(1)은 students의 item인 튜플의 2번째 요소로 정렬을 하겠다는 의미이다. 

students = [
    {"name": "jane", "age": 22, "grade": 'A'},
    {"name": "dave", "age": 32, "grade": 'B'},
    {"name": "sally", "age": 17, "grade": 'B'},
]

위와 같은 dict 형태에서도 동일하게 적용할 수 있다. 대신 itemgetter('age')처럼 딕셔너리의 키를 사용해야 한다.

class Student:
    def __init__(self, name, age, grade):
        self.name = name
        self.age = age
        self.grade = grade

 

students 리스트의 요소가 튜플이 아닌 Student 클래스의 객체일 경우에는 attrgetter('age')와 같이 attrgetter를 사용해야 한다.

References

https://blog.fakecoding.com/archives/python-multi-sort/

MSA 환경에서 서버 개발을 하다보면 다른 서버의 로직을 사용해야 하는 경우가 종종 발생한다.

 

해결 방법은 다음과 같이 2가지 이다.

1. 공통 서브 모듈을 만들어 서버들이 공유하도록 만든다. 

2. 다른 서버에 api 요청한다. 

 

필자의 생각으로는, 1번 방법은 로직 사용이 빈번할 경우에 사용하며 2번 방법은 자주 사용하지 않을 때 사용하는 것이 좋다고 생각한다.

 

서버끼리 api 콜을 하게 되면 그만큼 부하가 늘어나기도 하고, 로직이 복잡한 경우에 디버깅도 어려울 뿐만 아니라 테스트 코드 작성도 까다로워지기 때문이다.

 

    ...
    try:
        headers = {
            "ACCESS-TOKEN": request.META.get("HTTP_ACCESS_TOKEN"),
            "Content-Type": "application/json"
        }
        url = 'http://127.0.0.1:8001/schedule/v1/student/schedule'
        res = requests.post(url, data=json.dumps(request.data), timeout=5, headers=headers)
        status_code = res.status_code
        return status_code
    except:
        traceback.print_exc()

 

핵심 로직은 위와 같다.

 

기본적으로 requests 를 사용하며 다음과 같이 CRUD를 처리한다.

  • res = requests.get(url)
  • res = requests.post(url)
  • res = requests.delete(url)
  • res = requests.put(url)

 

응답 데이터 Response Content

status_code 응답 상태
headers headers정보
cookies cookies정보
encoding 데이터 인코딩
text 'str 타입의 데이터
content bytes 타입의 데이터
json() dict 타입의 데이터 일 경우 사용

 

 

References

https://pythonblog.co.kr/coding/10/

'Python' 카테고리의 다른 글

Python - with문 (feat.면접 회고)  (0) 2022.08.08
Python - 커스텀 정렬  (0) 2021.11.02
Python - 정렬 커스터마이징  (0) 2021.10.11
Python - 얕은 복사, 깊은 복사  (0) 2021.10.05
Python - list 특정 값 모든 원소 찾기  (0) 2021.09.19

파이썬은 자체적으로 정렬 함수를 제공하는데, 보통 sorted() 와 .sort()를 많이 사용한다.

temp = [3, 5, 6, 1, 0]

print(sorted(temp))

temp.sort()
print(temp)

위처럼 그냥 사용하면 두가지 모두 같은 결과를 내놓는다.

 

 

여기서 sorted()함수를 사용하면 원하는 기준을 세워 정렬할 수 있다.

 

 

글을 직접 쓰기 귀찮아서 파이썬 공식 홈페이지에서 복사해왔다.

 

결론적으로 보면, sorted 함수 내의 key에 해당하는 값에 원하는 함수를 넣어 그 방식대로 정렬을 명령할 수 있다.

 

내가 직접 사용한 예시는 다음과 같다.

 

https://programmers.co.kr/learn/courses/30/lessons/12915

 

코딩테스트 연습 - 문자열 내 마음대로 정렬하기

문자열로 구성된 리스트 strings와, 정수 n이 주어졌을 때, 각 문자열의 인덱스 n번째 글자를 기준으로 오름차순 정렬하려 합니다. 예를 들어 strings가 ["sun", "bed", "car"]이고 n이 1이면 각 단어의 인덱

programmers.co.kr

def solution(strings, n):
    answer = sorted(strings, key=lambda x : (x[n], x))
    return answer

 

간단하게 해결할 수 있다.

 

References

https://docs.python.org/ko/3/howto/sorting.html

Python 에서 List나 Query Set의 주소를 바꿔 복사를 해오려면 깊은 복사를 해야하는 줄 알았지만 얕은복사 만으로 커버가 가능했다.

안에 객체가 있는 경우에만 깊은 복사를 사용한다.

 

복사를 따지는 것은 mutable 객체일 때만 해당한다. immutable 객체는 값이 수정되는 것이 아니라 새로운 객체가 할당되는 것이기 때문

 

복사의 종류는 다음과 같이 3가지로 나눌 수 있다.

 

객체 복제

 

위와 같이 변수를 복사하면, 바라보는 객체가 동일하기 때문에 한 쪽에서 수정시 다른 쪽에서도 수정이 일어난다.

 

얕은 복사

얕은 복사를 하면 복합객체(리스트)는 별도로 생성하지만, 그 안에 들어가는 내용은 같은 객체를 참조함에 유의해야 한다. 참고로 copy.copy()이외에 slicing 으로도 얕은 복사를 할  수 있다.

 

깊은 복사

깊은 복사를 하면 내부에 있는 객체까지 다른 객체로 만든다. 

 

그러나 깊은 복사는 시간 복잡도가 높기 때문에 꼭 필요할 때만 사용해야 한다.

  • 10.59 sec (105.9us/itn) - copy.deepcopy(old_list)
  • 0.325 sec (3.25us/itn) - for item in old_list: new_list.append(item)
  • 0.217 sec (2.17us/itn) - [i for i in old_list] (a list comprehension)
  • 0.186 sec (1.86us/itn) - copy.copy(old_list)
  • 0.075 sec (0.75us/itn) - list(old_list)
  • 0.053 sec (0.53us/itn) - new_list = []; new_list.extend(old_list)
  • 0.039 sec (0.39us/itn) - old_list[:] (list slicing)

 

References

https://crackerjacks.tistory.com/14

https://blueshw.github.io/2016/01/20/shallow-copy-deep-copy/

필요한 경우가 꽤나 많은데, 깔끔하고 좋은 구문을 발견하여 정리한다.

 

Enumerate 함수를 사용한다.

result_list = [i for i, value in enumerate(target_list) if value == m]

 

참고로 Enumerate 함수는

  • 반복문 사용 시 몇 번째 반복문인지 확인할 때 사용한다.
  • 인덱스 번호와 컬렉션의 원소를 tuple형태로 반환한다.
>>> t = [1, 5, 7, 33, 39, 52]
>>> for p in enumerate(t):
...     print(p)
... 
(0, 1)
(1, 5)
(2, 7)
(3, 33)
(4, 39)
(5, 52)

 

References

https://wikidocs.net/16045

프로그래머스 문제 풀이를 보다가 아래와 같은 구문을 발견했다.

 

_reserve = [r for r in reserve if r not in lost]

찾아보니 if문의 조건을 만족하는 값(r)으로만 리스트를 구성하는 코드라고 한다.

 

심플하며 가독성이 좋아보여서 기록한다.

 

2021. 11. 01

추후에 찾아보니 List Comprehension 이라는 명칭이 붙어있었다. 아직 갈길이 멀다.

'Python' 카테고리의 다른 글

Python - 커스텀 정렬  (0) 2021.11.02
Django - 다른 서버에 api 요청하기  (0) 2021.10.14
Python - 정렬 커스터마이징  (0) 2021.10.11
Python - 얕은 복사, 깊은 복사  (0) 2021.10.05
Python - list 특정 값 모든 원소 찾기  (0) 2021.09.19

+ Recent posts