Copilot example

Exported source
# importing dependencies
import re
import pandas as pd
from pandas import DataFrame
from pathlib import Path
from spannerlib.utils import load_env
from spannerlib import get_magic_session,Session,Span
# load openAI api key from env file
load_env()

Motivation

In the previous tutorial, we showed how Spannerlib can make composing control logic for LLM agents, easy and lead to elegant code. In this tutorial we will show the power of combining the compositional abilities of a language like Spannerlog with Document Spanners. Document Spanners allow us to combine deep learning based ML techniques with structured text mining seamlessly.

Our use case will be building a copilot-like code documentation agent, with smarted input context as the prompt to the LLM. Normally, when running a model like copilot in an IDE like pycharm or vs-code, the prompt sent to the LLM consists of:

  • Our question.
  • The last \(k\) files in our editor history.

In our agent, instead, we would like to leverage our understanding of the structure of code to:

  • Limit our context only to relevant code snippets from our code base
    • Avoiding large prompts filled with irrelevant fata
  • Not limit ourselves to the last \(k\) files in our editor history

Problem definition

Given:

  • A collection of python files.
  • A cursor position in a python file.

Return:

  • A doc string of the python function that wraps the position of our cursor.

We will reuse our llm and format ie function from the basic tutorial. and introduce some new ie functions, namely:

  • ast_xpath(file,xpath_query)->(ast_node) that allows us to select ast nodes using xpath queries.
  • ast_to_span(file,ast_node)->(ast_span) that allows us to get the span of code described by an ast node of a given file. We will also use an ie functions from the standard libraries.
  • expr_eval(expression_template,val_1,...,val_n)->(expression_result) allows us to given a template string for a python expression in a printf like format.
    • used to generate small ie functions easily
    • the usage will become clear when writting our agent bellow.
  • Span comparison ie functions such as span_contained(span1,span2)->(bool) which return true if span1 is a subspan of span2.

We will also add a text-specific aggregation function:

  • lex_concat(strings)->(string) takes a set of strings and concatenate them when they are sorted by lexicographic order.
    • We must sort them lexicogrphically since aggregation functions in Datalog must operate under the set semantics
    • This is a limitation of the declerative lanuage we chose for this demo, but it can be overcome by using the spannerlib framework on a callback extension of a richer declerative language list SQL.

TLDR

%%spannerlog
# find all function definitions in the code base
FuncDefSpan(span,name)<-
    Files(text),
    ast_xpath(tet, "//FunctionDef")->(node),
    ast_to_span(text,node)->(span),
    expr_eval("{0}.name",node)->(name).

# find all function calls in the code base
FuncCallSpan(span,name)<-
    Files(text),
    ast_xpath(text, "//Call/func/Name")->(node),
    ast_to_span(text,node)->(span),
    as_str(span)->(name).

# find the function whose span contains the cursor
CursorWrappingFunc(cursor,name)<-
    Cursors(cursor),
    FuncDefSpan(span,name),
    span_contained(cursor,span)->(True).

# get all functions who had a call to called_name inside them
Mentions(lex_concat(caller_span),called_name)<-
    FuncCallSpan(called_span,called_name),
    FuncDefSpan(caller_span,caller_name),
    span_contained(called_span,caller_span)->(True).

model = 'gpt-3.5-turbo'
# compose current function and all mentions into a prompt and
# ask the model to generate the documentation for the function
DocumentFunction(cursor,answer)<-
    CursorWrappingFunc(cursor,name),
    Mentions(mentions,name),
    FuncDefSpan(def_span,name),
    as_str(def_span)->(def_string),
    format($func_document_template,mentions,def_string)->(prompt),
    llm($model,prompt)->(answer).

A full walkthrough of our implementation.

Importing code from previous tutorial

sess = get_magic_session()
sess.register('llm',llm,[str,str],[str])
sess.register('format', format_ie, string_schema,[str])

Implementing novel Callback functions

If the implementation details are not of interest, feel free to move to the next section.

In order to analyze the structure of the code, we will be using python’s ast module. We will write a very generic ie function that gets a piece of code, and an xpath query string returns the spans of all matches of the query over the ast of the given code.

To do so we will use the pyastgrep library that allows us to look for xpath matches in python ast’s. We will write a modified version of it’s main function that returns Spans of the ast nodes.


source

ast_xpath

 ast_xpath (py_str, xpath_query)
Exported source
import ast
from functools import cache
from pyastgrep.search import search_python_files,Match
from pyastgrep.asts import ast_to_xml
from lxml import etree
Exported source
@cache
def _py_to_xml(py:str)->str:
    ast_tree = ast.parse(py)
    node_mappings = {}
    xml_tree = ast_to_xml(ast_tree, node_mappings)
    return xml_tree,ast_tree,node_mappings

def _xml_to_string(xml_tree):
    return etree.tostring(xml_tree, pretty_print=True).decode('utf-8')

def _print_file_xml(file_path):
    text = Path(file_path).read_text()
    xml_tree,_,_ = _py_to_xml(text)
    print(_xml_to_string(xml_tree))


def _ast_to_string(ast_tree):
    if isinstance(ast_tree,ast.AST):
        return ast.unparse(ast_tree)
    else:
        return ast_tree

def ast_xpath(py_str,xpath_query):
    if isinstance(py_str,Path):
        py_str = py_str.read_text()
    if isinstance(py_str,Span):
        py_str = str(py_str)
    xml_tree,ast_tree,node_mappings = _py_to_xml(py_str)
    xml_matches = xml_tree.xpath(xpath_query)
    ast_matches = [node_mappings[match] if match in node_mappings else match for match in xml_matches]
    return ast_matches
code_file = Path('copilot_data/example_code.py')
code_text = code_file.read_text()
print(code_text)
def f(x,y):
    x+y 

def g(x,y):
    return f(x,y)**2

class A:
    def __init__(self, x):
        self.x = x
    def method(self, y):
        return f(self.x, y)

print(f(2,3))
x_t,a_t,n_m = _py_to_xml(code_text)
_print_file_xml(code_file)
# we do not present the output xml here as it is too long
print(_ast_to_string(a_t))
def f(x, y):
    x + y

def g(x, y):
    return f(x, y) ** 2

class A:

    def __init__(self, x):
        self.x = x

    def method(self, y):
        return f(self.x, y)
print(f(2, 3))
for match in ast_xpath(code_file,'//FunctionDef'):
    print(_ast_to_string(match))

print("="*80)
for match in ast_xpath(code_text,'//FunctionDef/@name'):
    print(_ast_to_string(match))
def f(x, y):
    x + y
def g(x, y):
    return f(x, y) ** 2
def __init__(self, x):
    self.x = x
def method(self, y):
    return f(self.x, y)
================================================================================
f
g
__init__
method

source

ast_to_span

 ast_to_span (string, node)

given a node of an ast from file , returns the location of the node in the file as a Span object

Exported source
@cache
def _get_lines(path):
    if isinstance(path,Path):
        tuple(path.read_text().split('\n'))
    else:
        return tuple(path.split('\n'))

def _get_character_position(path, line_number, column_offset):
    """gets a character position from a line number and column offset"""
    lines = _get_lines(path)
    if line_number < 1 or line_number > len(lines):
        raise ValueError("Invalid line number")
    line = lines[line_number - 1]
    if column_offset < 0 or column_offset > len(line):
        raise ValueError("Invalid column offset")
    return sum(len(lines[i]) + 1 for i in range(line_number - 1)) + column_offset

def ast_to_span(string,node):
    """given a node <node> of an ast from file <path>,
    returns the location of the node in the file as a Span object"""
    if isinstance(string,Path):
        text = string.read_text()
        name = string.name
    else:
        text = string
        name = None
    start = _get_character_position(str(text),node.lineno,node.col_offset)
    if hasattr(node,'end_lineno') and hasattr(node,'end_col_offset'):
        end = _get_character_position(str(text),node.end_lineno,node.end_col_offset)
    else:
        end = start + len(ast.unparse(node))
    return [Span(text,start,end,name=name)]
matches = ast_xpath(code_text,'//FunctionDef')
m = matches[0]
span = ast_to_span(code_file,m)[0]
span,str(span)
([@example_code.py,0,19) "def f(x,y)...", 'def f(x,y):\n    x+y')

source

lex_concat

 lex_concat (strings)
Exported source
def lex_concat(strings):
    return '\n'.join(sorted([str(s) for s in strings]))
# note that we use the ast node class directly in spannerlib since we want to access node attributes as well
sess.register('ast_xpath',ast_xpath,[(str,Path,Span),str],[ast.AST])
sess.register('ast_to_span',ast_to_span,[(str,Span,Path),ast.AST],[Span])
sess.register_agg('lex_concat',lex_concat,[(str,Span)],[str])

Using our ie functions

# non primitve variables cannot be initiallzed from within spannerlob, so we define
# a path variable from the outside.
sess.import_var('code_file',code_file)
ExampleAST(span,string)<-
    ast_xpath($code_file,'//FunctionDef')->(node),
    ast_to_span($code_file,node)->(span),
    as_str(span)->(string).

?ExampleAST(span,string)
'?ExampleAST(span,string)'
span string
[@example_code.py,0,19) "def f(x,y)..." def f(x,y): x+y
[@example_code.py,22,54) "def g(x,y)..." def g(x,y): return f(x,y)**2
[@example_code.py,69,110) "def __init..." def __init__(self, x): self.x = x
[@example_code.py,115,163) "def method..." def method(self, y): return f(self.x, y)

Bringing in data

To keep the outputs readable, we will only load a single python file

example_files = pd.DataFrame(
    [(Span(code_file),)]
)
example_files.map(repr)
0
0 [@example_code.py,0,178) "def f(x,y)..."
print(code_file.read_text())
def f(x,y):
    x+y 

def g(x,y):
    return f(x,y)**2

class A:
    def __init__(self, x):
        self.x = x
    def method(self, y):
        return f(self.x, y)

print(f(2,3))

We will simulate a cursor position inside the function f

cursors =pd.DataFrame([(Span(code_file,16,17),)])
cursors.map(repr)
0
0 [@example_code.py,16,17) "x"

We import this data to our session.

sess.import_rel('Files',example_files)
sess.import_rel('Cursors',cursors)

And now we can incrementally build our rules from the bottom up

# get all spans of function definitions and their name
# note we use expr_eval to get the name attribute of the ast Node we assigned to the free variable 'node'.
FuncDefSpan(span,name)<-
    Files(text),
    ast_xpath(text, "//FunctionDef")->(node),
    ast_to_span(text,node)->(span),
    expr_eval("{0}.name",node)->(name).

?FuncDefSpan(span,name)
'?FuncDefSpan(span,name)'
span name
[@example_code.py,0,19) "def f(x,y)..." f
[@example_code.py,22,54) "def g(x,y)..." g
[@example_code.py,69,110) "def __init..." __init__
[@example_code.py,115,163) "def method..." method
# get all spans of function calls and their names
FuncCallSpan(span,name)<-
    Files(text),
    ast_xpath(text, "//Call/func/Name")->(node),
    ast_to_span(text,node)->(span),
    as_str(span)->(name).

?FuncCallSpan(span,name)
'?FuncCallSpan(span,name)'
span name
[@example_code.py,45,46) "f" f
[@example_code.py,151,152) "f" f
[@example_code.py,165,170) "print" print
[@example_code.py,171,172) "f" f
# we compute the func wrapping a cursor by checking with function def span contains our cursor
CursorWrappingFunc(cursor,name)<-
    Cursors(cursor),
    FuncDefSpan(span,name),
    span_contained(cursor,span)->(True).

?CursorWrappingFunc(cursor,name)
'?CursorWrappingFunc(cursor,name)'
cursor name
[@example_code.py,16,17) "x" f
# we get all mentions of a function by looking for function calls of it that are sub spans of func definitions
# we aggregate our mentions using lex_concat to get a single mention context per function.
Mentions(lex_concat(caller_span),called_name)<-
    FuncCallSpan(called_span,called_name),
    FuncDefSpan(caller_span,caller_name),
    span_contained(called_span,caller_span)->(True).

?Mentions(mentions,func)
'?Mentions(mentions,func)'
mentions func
def g(x,y): return f(x,y)**2 def method(self, y): return f(self.x, y) f

Now to piece this together to an LLM prompt we can call, lets define our prompt template:

func_document_template = """
system: based on the following context:
{}
Explain the following function:
{}
In the format of a doc string.
"""
sess.import_var('func_document_template',func_document_template)

And just like in our basic agent tutorial, we get our strings from our lowlevel rules and compose them using the format ie function.

model = 'gpt-3.5-turbo'
DocumentFunctionPrompt(cursor,prompt)<-
    CursorWrappingFunc(cursor,name),
    Mentions(mentions,name),
    FuncDefSpan(def_span,name),
    as_str(def_span)->(def_string),
    format($func_document_template,mentions,def_string)->(prompt).

?DocumentFunctionPrompt(cursor,prompt)

DocumentFunction(cursor,answer)<-
    DocumentFunctionPrompt(cursor,prompt),
    llm($model,prompt)->(answer).

?DocumentFunction(cursor,answer)
'?DocumentFunctionPrompt(cursor,prompt)'
cursor prompt
[@example_code.py,16,17) "x" system: based on the following context: def g(x,y): return f(x,y)**2 def method(self, y): return f(self.x, y) Explain the following function: def f(x,y): x+y In the format of a doc string.
'?DocumentFunction(cursor,answer)'
cursor answer
[@example_code.py,16,17) "x" """ This function calculates the sum of two inputs x and y. """

Putting all of our spannerlog together, we get:

FuncDefSpan(span,name)<-
    Files(text),
    ast_xpath(text, "//FunctionDef")->(node),
    ast_to_span(text,node)->(span),
    expr_eval("{0}.name",node)->(name).

FuncCallSpan(span,name)<-
    Files(text),
    ast_xpath(text, "//Call/func/Name")->(node),
    ast_to_span(text,node)->(span),
    as_str(span)->(name).

CursorWrappingFunc(cursor,name)<-
    Cursors(cursor),
    FuncDefSpan(span,name),
    span_contained(cursor,span)->(True).

Mentions(lex_concat(caller_span),called_name)<-
    FuncCallSpan(called_span,called_name),
    FuncDefSpan(caller_span,caller_name),
    span_contained(called_span,caller_span)->(True).

model = 'gpt-3.5-turbo'
DocumentFunctionPrompt(cursor,prompt)<-
    CursorWrappingFunc(cursor,name),
    Mentions(mentions,name),
    FuncDefSpan(def_span,name),
    as_str(def_span)->(def_string),
    format($func_document_template,mentions,def_string)->(prompt).

DocumentFunction(cursor,answer)<-
    DocumentFunctionPrompt(cursor,prompt),
    llm($model,prompt)->(answer).

Note how short and elegant, a complex pipeline can be when we decompose our code into: * powerful and generic callbacks * declerative compositional logic

And the ease with which we can combine formal IE extractions and LLMs to get the best of both: * structured analysis * NLP via LLMs